aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/network_services/vnf_generic
diff options
context:
space:
mode:
authorDeepak S <deepak.s@linux.intel.com>2017-06-20 14:31:19 -0700
committerRoss Brattain <ross.b.brattain@intel.com>2017-08-08 08:54:23 -0700
commit5ce3b6f8c8b3217091e51a6041455738603d90b8 (patch)
treeca34e15a85d69e2b23ce498fead47761624ae42c /yardstick/network_services/vnf_generic
parent72778951d6b8968f562fb8fefa02a57159ea1b83 (diff)
NSB update
Refactored main NSB VNF classes accroding to class diagram https://wiki.opnfv.org/display/yardstick/NSB+class+diagram All the SampleVNFs have been separated and placed under the SampleVNF class. Added AutoConnectSSH to automatically create SSH conneciton on demand. Added VnfdHelper class to wrap the VNFD dictionary in prepartion for class-based modeling. Extracted DpdkVnfSetupEnvHelper for DPDK based VNF setup. Extracted Stats and other client config to ResourceHelper Had to replace dict_key_flatten with deepgetitem due to Python 2.7 Jinja2 infinite recursion. Change-Id: Ia8840e9c44cdbdf39aab6b02e6d2176b31937dc9 Signed-off-by: Deepak S <deepak.s@linux.intel.com> Signed-off-by: Edward MacGillivray <edward.s.macgillivray@intel.com> Signed-off-by: Ross Brattain <ross.b.brattain@intel.com>
Diffstat (limited to 'yardstick/network_services/vnf_generic')
-rw-r--r--yardstick/network_services/vnf_generic/vnf/base.py234
-rw-r--r--yardstick/network_services/vnf_generic/vnf/sample_vnf.py994
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_ping.py155
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py303
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_trex.py345
-rw-r--r--yardstick/network_services/vnf_generic/vnf/vpe_vnf.py545
-rw-r--r--yardstick/network_services/vnf_generic/vnfdgen.py64
7 files changed, 1570 insertions, 1070 deletions
diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py
index 2df6037f3..955f9f03d 100644
--- a/yardstick/network_services/vnf_generic/vnf/base.py
+++ b/yardstick/network_services/vnf_generic/vnf/base.py
@@ -15,10 +15,6 @@
from __future__ import absolute_import
import logging
-import ipaddress
-import six
-
-from yardstick.network_services.utils import get_nsb_option
LOG = logging.getLogger(__name__)
@@ -61,192 +57,69 @@ class QueueFileWrapper(object):
self.q_out.get()
-class GenericVNF(object):
+class VnfdHelper(dict):
+
+ @property
+ def mgmt_interface(self):
+ return self["mgmt-interface"]
+
+ @property
+ def vdu(self):
+ return self['vdu']
+
+ @property
+ def vdu0(self):
+ return self.vdu[0]
+
+ @property
+ def interfaces(self):
+ return self.vdu0['external-interface']
+
+ @property
+ def kpi(self):
+ return self['benchmark']['kpi']
+
+ def find_virtual_interface(self, **kwargs):
+ key, value = next(iter(kwargs.items()))
+ for interface in self.interfaces:
+ virtual_intf = interface["virtual-interface"]
+ if virtual_intf[key] == value:
+ return interface
+
+ def find_interface(self, **kwargs):
+ key, value = next(iter(kwargs.items()))
+ for interface in self.interfaces:
+ if interface[key] == value:
+ return interface
+
+
+class VNFObject(object):
+
+ def __init__(self, name, vnfd):
+ super(VNFObject, self).__init__()
+ self.name = name
+ self.vnfd_helper = VnfdHelper(vnfd) # fixme: parse this into a structure
+
+
+class GenericVNF(VNFObject):
+
""" Class providing file-like API for generic VNF implementation """
- def __init__(self, vnfd):
- super(GenericVNF, self).__init__()
- self.vnfd = vnfd # fixme: parse this into a structure
+ def __init__(self, name, vnfd):
+ super(GenericVNF, self).__init__(name, vnfd)
# List of statistics we can obtain from this VNF
# - ETSI MANO 6.3.1.1 monitoring_parameter
- self.kpi = self._get_kpi_definition(vnfd)
+ self.kpi = self._get_kpi_definition()
# Standard dictionary containing params like thread no, buffer size etc
self.config = {}
self.runs_traffic = False
- self.name = "vnf__1" # name in topology file
- self.bin_path = get_nsb_option("bin_path", "")
- @classmethod
- def _get_kpi_definition(cls, vnfd):
+ def _get_kpi_definition(self):
""" Get list of KPIs defined in VNFD
:param vnfd:
:return: list of KPIs, e.g. ['throughput', 'latency']
"""
- return vnfd['benchmark']['kpi']
-
- @classmethod
- def get_ip_version(cls, ip_addr):
- """ get ip address version v6 or v4 """
- try:
- address = ipaddress.ip_address(six.text_type(ip_addr))
- except ValueError:
- LOG.error(ip_addr, " is not valid")
- return
- else:
- return address.version
-
- def _ip_to_hex(self, ip_addr):
- ip_x = ip_addr
- if self.get_ip_version(ip_addr) == 4:
- ip_to_convert = ip_addr.split(".")
- ip_octect = [int(octect) for octect in ip_to_convert]
- ip_x = "{0[0]:02X}{0[1]:02X}{0[2]:02X}{0[3]:02X}".format(ip_octect)
- return ip_x
-
- def _get_dpdk_port_num(self, name):
- for intf in self.vnfd['vdu'][0]['external-interface']:
- if name == intf['name']:
- return intf['virtual-interface']['dpdk_port_num']
-
- def _append_routes(self, ip_pipeline_cfg):
- if 'routing_table' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['routing_table']
-
- where = ip_pipeline_cfg.find("arp_route_tbl")
- link = ip_pipeline_cfg[:where]
- route_add = ip_pipeline_cfg[where:]
-
- tmp = route_add.find('\n')
- route_add = route_add[tmp:]
-
- cmds = "arp_route_tbl ="
-
- for route in routing_table:
- net = self._ip_to_hex(route['network'])
- net_nm = self._ip_to_hex(route['netmask'])
- net_gw = self._ip_to_hex(route['gateway'])
- port = self._get_dpdk_port_num(route['if'])
- cmd = \
- " ({port0_local_ip_hex},{port0_netmask_hex},{dpdk_port},"\
- "{port1_local_ip_hex})".format(port0_local_ip_hex=net,
- port0_netmask_hex=net_nm,
- dpdk_port=port,
- port1_local_ip_hex=net_gw)
- cmds += cmd
-
- cmds += '\n'
- ip_pipeline_cfg = link + cmds + route_add
-
- return ip_pipeline_cfg
-
- def _append_nd_routes(self, ip_pipeline_cfg):
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- where = ip_pipeline_cfg.find("nd_route_tbl")
- link = ip_pipeline_cfg[:where]
- route_nd = ip_pipeline_cfg[where:]
-
- tmp = route_nd.find('\n')
- route_nd = route_nd[tmp:]
-
- cmds = "nd_route_tbl ="
-
- for route in routing_table:
- net = route['network']
- net_nm = route['netmask']
- net_gw = route['gateway']
- port = self._get_dpdk_port_num(route['if'])
- cmd = \
- " ({port0_local_ip_hex},{port0_netmask_hex},{dpdk_port},"\
- "{port1_local_ip_hex})".format(port0_local_ip_hex=net,
- port0_netmask_hex=net_nm,
- dpdk_port=port,
- port1_local_ip_hex=net_gw)
- cmds += cmd
-
- cmds += '\n'
- ip_pipeline_cfg = link + cmds + route_nd
-
- return ip_pipeline_cfg
-
- def _get_port0localip6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 1:
- return_value = route['network']
- LOG.info("_get_port0localip6 : %s", return_value)
- return return_value
-
- def _get_port1localip6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 2:
- return_value = route['network']
- LOG.info("_get_port1localip6 : %s", return_value)
- return return_value
-
- def _get_port0prefixlen6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 1:
- return_value = route['netmask']
- LOG.info("_get_port0prefixlen6 : %s", return_value)
- return return_value
-
- def _get_port1prefixlen6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 2:
- return_value = route['netmask']
- LOG.info("_get_port1prefixlen6 : %s", return_value)
- return return_value
-
- def _get_port0gateway6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 1:
- return_value = route['network']
- LOG.info("_get_port0gateway6 : %s", return_value)
- return return_value
-
- def _get_port1gateway6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 2:
- return_value = route['network']
- LOG.info("_get_port1gateway6 : %s", return_value)
- return return_value
+ return self.vnfd_helper.kpi
def instantiate(self, scenario_cfg, context_cfg):
""" Prepare VNF for operation and start the VNF process/VM
@@ -284,11 +157,10 @@ class GenericVNF(object):
class GenericTrafficGen(GenericVNF):
""" Class providing file-like API for generic traffic generator """
- def __init__(self, vnfd):
- super(GenericTrafficGen, self).__init__(vnfd)
+ def __init__(self, name, vnfd):
+ super(GenericTrafficGen, self).__init__(name, vnfd)
self.runs_traffic = True
self.traffic_finished = False
- self.name = "tgen__1" # name in topology file
def run_traffic(self, traffic_profile):
""" Generate traffic on the wire according to the given params.
diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
new file mode 100644
index 000000000..89c086d97
--- /dev/null
+++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
@@ -0,0 +1,994 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" Base class implementation for generic vnf implementation """
+
+from __future__ import absolute_import
+
+import posixpath
+import time
+import logging
+import os
+import re
+import subprocess
+from collections import Mapping
+
+from multiprocessing import Queue, Value, Process
+
+from six.moves import cStringIO
+
+from yardstick.benchmark.contexts.base import Context
+from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
+from yardstick.network_services.helpers.cpu import CpuSysCores
+from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
+from yardstick.network_services.nfvi.resource import ResourceProfile
+from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
+from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
+from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
+from yardstick.network_services.utils import get_nsb_option
+
+from stl.trex_stl_lib.trex_stl_client import STLClient
+from stl.trex_stl_lib.trex_stl_client import LoggerApi
+from stl.trex_stl_lib.trex_stl_exceptions import STLError, STLStateError
+
+from yardstick.ssh import AutoConnectSSH
+
+DPDK_VERSION = "dpdk-16.07"
+
+LOG = logging.getLogger(__name__)
+
+
+REMOTE_TMP = "/tmp"
+
+
+class VnfSshHelper(AutoConnectSSH):
+
+ def __init__(self, node, bin_path, wait=None):
+ self.node = node
+ kwargs = self.args_from_node(self.node)
+ if wait:
+ kwargs.setdefault('wait', wait)
+
+ super(VnfSshHelper, self).__init__(**kwargs)
+ self.bin_path = bin_path
+
+ @staticmethod
+ def get_class():
+ # must return static class name, anything else refers to the calling class
+ # i.e. the subclass, not the superclass
+ return VnfSshHelper
+
+ def copy(self):
+ # this copy constructor is different from SSH classes, since it uses node
+ return self.get_class()(self.node, self.bin_path)
+
+ def upload_config_file(self, prefix, content):
+ cfg_file = os.path.join(REMOTE_TMP, prefix)
+ LOG.debug(content)
+ file_obj = cStringIO(content)
+ self.put_file_obj(file_obj, cfg_file)
+ return cfg_file
+
+ def join_bin_path(self, *args):
+ return os.path.join(self.bin_path, *args)
+
+ def provision_tool(self, tool_path=None, tool_file=None):
+ if tool_path is None:
+ tool_path = self.bin_path
+ return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
+
+
+class SetupEnvHelper(object):
+
+ CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
+ CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
+ CORES = []
+ DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
+ PIPELINE_COMMAND = ''
+ VNF_TYPE = "SAMPLE"
+
+ def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
+ super(SetupEnvHelper, self).__init__()
+ self.vnfd_helper = vnfd_helper
+ self.ssh_helper = ssh_helper
+ self.scenario_helper = scenario_helper
+
+ def _get_ports_gateway(self, name):
+ routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
+ for route in routing_table:
+ if name == route['if']:
+ return route['gateway']
+ return None
+
+ def build_config(self):
+ raise NotImplementedError
+
+ def setup_vnf_environment(self):
+ pass
+ # raise NotImplementedError
+
+ def tear_down(self):
+ raise NotImplementedError
+
+
+class DpdkVnfSetupEnvHelper(SetupEnvHelper):
+
+ APP_NAME = 'DpdkVnf'
+ DPDK_BIND_CMD = "sudo {dpdk_nic_bind} {force} -b {driver} {vpci}"
+ DPDK_UNBIND_CMD = "sudo {dpdk_nic_bind} --force -b {driver} {vpci}"
+ FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
+
+ HW_DEFAULT_CORE = 3
+ SW_DEFAULT_CORE = 2
+
+ DPDK_STATUS_DRIVER_RE = re.compile(r"(\d{2}:\d{2}\.\d).*drv=([-\w]+)")
+
+ @staticmethod
+ def _update_packet_type(ip_pipeline_cfg, traffic_options):
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
+ pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
+ return pipeline_config_str
+
+ @classmethod
+ def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
+ traffic_type = traffic_options['traffic_type']
+
+ if traffic_options['vnf_type'] is not cls.APP_NAME:
+ match_str = 'traffic_type = 4'
+ replace_str = 'traffic_type = {0}'.format(traffic_type)
+
+ elif traffic_type == 4:
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = ipv4'
+
+ else:
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = ipv6'
+
+ pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
+ return pipeline_config_str
+
+ def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
+ super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
+ self.all_ports = None
+ self.bound_pci = None
+ self._dpdk_nic_bind = None
+ self.socket = None
+
+ @property
+ def dpdk_nic_bind(self):
+ if self._dpdk_nic_bind is None:
+ self._dpdk_nic_bind = self.ssh_helper.provision_tool(tool_file="dpdk-devbind.py")
+ return self._dpdk_nic_bind
+
+ def _setup_hugepages(self):
+ cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
+ hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
+
+ memory_path = \
+ '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
+ self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
+
+ if hugepages == "2048kB":
+ pages = 16384
+ else:
+ pages = 16
+
+ self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
+
+ def _get_dpdk_port_num(self, name):
+ interface = self.vnfd_helper.find_interface(name=name)
+ return interface['virtual-interface']['dpdk_port_num']
+
+ def build_config(self):
+ vnf_cfg = self.scenario_helper.vnf_cfg
+ task_path = self.scenario_helper.task_path
+
+ lb_count = vnf_cfg.get('lb_count', 3)
+ lb_config = vnf_cfg.get('lb_config', 'SW')
+ worker_config = vnf_cfg.get('worker_config', '1C/1T')
+ worker_threads = vnf_cfg.get('worker_threads', 3)
+
+ traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
+ traffic_options = {
+ 'traffic_type': traffic_type,
+ 'pkt_type': 'ipv%s' % traffic_type,
+ 'vnf_type': self.VNF_TYPE,
+ }
+
+ config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
+ config_basename = posixpath.basename(self.CFG_CONFIG)
+ script_basename = posixpath.basename(self.CFG_SCRIPT)
+ multiport = MultiPortConfig(self.scenario_helper.topology,
+ config_tpl_cfg,
+ config_basename,
+ self.vnfd_helper.interfaces,
+ self.VNF_TYPE,
+ lb_count,
+ worker_threads,
+ worker_config,
+ lb_config,
+ self.socket)
+
+ multiport.generate_config()
+ with open(self.CFG_CONFIG) as handle:
+ new_config = handle.read()
+
+ new_config = self._update_traffic_type(new_config, traffic_options)
+ new_config = self._update_packet_type(new_config, traffic_options)
+
+ self.ssh_helper.upload_config_file(config_basename, new_config)
+ self.ssh_helper.upload_config_file(script_basename,
+ multiport.generate_script(self.vnfd_helper))
+ self.all_ports = multiport.port_pair_list
+
+ LOG.info("Provision and start the %s", self.APP_NAME)
+ self._build_pipeline_kwargs()
+ return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
+
+ def _build_pipeline_kwargs(self):
+ tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
+ ports_len_hex = hex(2 ** (len(self.all_ports) + 1) - 1)
+ self.pipeline_kwargs = {
+ 'cfg_file': self.CFG_CONFIG,
+ 'script': self.CFG_SCRIPT,
+ 'ports_len_hex': ports_len_hex,
+ 'tool_path': tool_path,
+ }
+
+ def _get_app_cpu(self):
+ if self.CORES:
+ return self.CORES
+
+ vnf_cfg = self.scenario_helper.vnf_cfg
+ sys_obj = CpuSysCores(self.ssh_helper)
+ self.sys_cpu = sys_obj.get_core_socket()
+ num_core = int(vnf_cfg["worker_threads"])
+ if vnf_cfg.get("lb_config", "SW") == 'HW':
+ num_core += self.HW_DEFAULT_CORE
+ else:
+ num_core += self.SW_DEFAULT_CORE
+ app_cpu = self.sys_cpu[str(self.socket)][:num_core]
+ return app_cpu
+
+ def _get_cpu_sibling_list(self, cores=None):
+ if cores is None:
+ cores = self._get_app_cpu()
+ sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
+ awk_template = "awk -F: '{ print $1 }' < %s"
+ sys_path = "/sys/devices/system/cpu/"
+ cpu_topology = []
+ try:
+ for core in cores:
+ sys_cmd = sys_cmd_template % (sys_path, core)
+ cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
+ cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
+
+ return cpu_topology
+ except Exception:
+ return []
+
+ def _validate_cpu_cfg(self):
+ return self._get_cpu_sibling_list()
+
+ def _find_used_drivers(self):
+ cmd = "{0} -s".format(self.dpdk_nic_bind)
+ rc, dpdk_status, _ = self.ssh_helper.execute(cmd)
+
+ self.used_drivers = {
+ vpci: (index, driver)
+ for index, (vpci, driver)
+ in enumerate(self.DPDK_STATUS_DRIVER_RE.findall(dpdk_status))
+ if any(b.endswith(vpci) for b in self.bound_pci)
+ }
+
+ def setup_vnf_environment(self):
+ self._setup_dpdk()
+ resource = self._setup_resources()
+ self._kill_vnf()
+ self._detect_drivers()
+ return resource
+
+ def _kill_vnf(self):
+ self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
+
+ def _setup_dpdk(self):
+ """ setup dpdk environment needed for vnf to run """
+
+ self._setup_hugepages()
+ self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
+
+ exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
+ if exit_status == 0:
+ return
+
+ dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
+ dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
+ exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
+ if exit_status != 0:
+ self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
+
+ def _setup_resources(self):
+ interfaces = self.vnfd_helper.interfaces
+ self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
+
+ # what is this magic? how do we know which socket is for which port?
+ # what about quad-socket?
+ if any(v[5] == "0" for v in self.bound_pci):
+ self.socket = 0
+ else:
+ self.socket = 1
+
+ cores = self._validate_cpu_cfg()
+ return ResourceProfile(self.vnfd_helper, cores)
+
+ def _detect_drivers(self):
+ interfaces = self.vnfd_helper.interfaces
+
+ self._find_used_drivers()
+ for vpci, (index, _) in self.used_drivers.items():
+ try:
+ intf1 = next(v for v in interfaces if vpci == v['virtual-interface']['vpci'])
+ except StopIteration:
+ pass
+ else:
+ intf1['dpdk_port_num'] = index
+
+ for vpci in self.bound_pci:
+ self._bind_dpdk('igb_uio', vpci)
+ time.sleep(2)
+
+ def _bind_dpdk(self, driver, vpci, force=True):
+ if force:
+ force = '--force '
+ else:
+ force = ''
+ cmd = self.DPDK_BIND_CMD.format(force=force,
+ dpdk_nic_bind=self.dpdk_nic_bind,
+ driver=driver,
+ vpci=vpci)
+ self.ssh_helper.execute(cmd)
+
+ def _detect_and_bind_dpdk(self, vpci, driver):
+ find_net_cmd = self.FIND_NET_CMD.format(vpci)
+ exit_status, _, _ = self.ssh_helper.execute(find_net_cmd)
+ if exit_status == 0:
+ # already bound
+ return None
+ self._bind_dpdk(driver, vpci)
+ exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
+ if exit_status != 0:
+ # failed to bind
+ return None
+ return stdout
+
+ def _bind_kernel_devices(self):
+ for intf in self.vnfd_helper.interfaces:
+ vi = intf["virtual-interface"]
+ stdout = self._detect_and_bind_dpdk(vi["vpci"], vi["driver"])
+ if stdout is not None:
+ vi["local_iface_name"] = posixpath.basename(stdout)
+
+ def tear_down(self):
+ for vpci, (_, driver) in self.used_drivers.items():
+ self.ssh_helper.execute(self.DPDK_UNBIND_CMD.format(dpdk_nic_bind=self.dpdk_nic_bind,
+ driver=driver,
+ vpci=vpci))
+
+
+class ResourceHelper(object):
+
+ COLLECT_KPI = ''
+ MAKE_INSTALL = 'cd {0} && make && sudo make install'
+ RESOURCE_WORD = 'sample'
+
+ COLLECT_MAP = {}
+
+ def __init__(self, setup_helper):
+ super(ResourceHelper, self).__init__()
+ self.resource = None
+ self.setup_helper = setup_helper
+ self.ssh_helper = setup_helper.ssh_helper
+
+ def setup(self):
+ self.resource = self.setup_helper.setup_vnf_environment()
+
+ def generate_cfg(self):
+ pass
+
+ def _collect_resource_kpi(self):
+ result = {}
+ status = self.resource.check_if_sa_running("collectd")[0]
+ if status:
+ result = self.resource.amqp_collect_nfvi_kpi()
+
+ result = {"core": result}
+ return result
+
+ def start_collect(self):
+ self.resource.initiate_systemagent(self.ssh_helper.bin_path)
+ self.resource.start()
+ self.resource.amqp_process_for_nfvi_kpi()
+
+ def stop_collect(self):
+ if self.resource:
+ self.resource.stop()
+
+ def collect_kpi(self):
+ return self._collect_resource_kpi()
+
+
+class ClientResourceHelper(ResourceHelper):
+
+ RUN_DURATION = 60
+ QUEUE_WAIT_TIME = 5
+ SYNC_PORT = 1
+ ASYNC_PORT = 2
+
+ def __init__(self, setup_helper):
+ super(ClientResourceHelper, self).__init__(setup_helper)
+ self.vnfd_helper = setup_helper.vnfd_helper
+ self.scenario_helper = setup_helper.scenario_helper
+
+ self.client = None
+ self.client_started = Value('i', 0)
+ self.my_ports = None
+ self._queue = Queue()
+ self._result = {}
+ self._terminated = Value('i', 0)
+ self._vpci_ascending = None
+
+ def _build_ports(self):
+ self.my_ports = [0, 1]
+
+ def get_stats(self, *args, **kwargs):
+ try:
+ return self.client.get_stats(*args, **kwargs)
+ except STLStateError:
+ LOG.exception("TRex client not connected")
+ return {}
+
+ def generate_samples(self, key=None, default=None):
+ last_result = self.get_stats(self.my_ports)
+ key_value = last_result.get(key, default)
+
+ if not isinstance(last_result, Mapping): # added for mock unit test
+ self._terminated.value = 1
+ return {}
+
+ samples = {}
+ for vpci_idx, vpci in enumerate(self._vpci_ascending):
+ name = self.vnfd_helper.find_virtual_interface(vpci=vpci)["name"]
+ # fixme: VNFDs KPIs values needs to be mapped to TRex structure
+ xe_value = last_result.get(vpci_idx, {})
+ samples[name] = {
+ "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
+ "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
+ "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
+ "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
+ "in_packets": int(xe_value.get("ipackets", 0)),
+ "out_packets": int(xe_value.get("opackets", 0)),
+ }
+ if key:
+ samples[name][key] = key_value
+ return samples
+
+ def _run_traffic_once(self, traffic_profile):
+ traffic_profile.execute(self)
+ self.client_started.value = 1
+ time.sleep(self.RUN_DURATION)
+ samples = self.generate_samples()
+ time.sleep(self.QUEUE_WAIT_TIME)
+ self._queue.put(samples)
+
+ def run_traffic(self, traffic_profile):
+ # fixme: fix passing correct trex config file,
+ # instead of searching the default path
+ self._build_ports()
+ self.client = self._connect()
+ self.client.reset(ports=self.my_ports)
+ self.client.remove_all_streams(self.my_ports) # remove all streams
+ traffic_profile.register_generator(self)
+
+ while self._terminated.value == 0:
+ self._run_traffic_once(traffic_profile)
+
+ self.client.stop(self.my_ports)
+ self.client.disconnect()
+ self._terminated.value = 0
+
+ def terminate(self):
+ self._terminated.value = 1 # stop client
+
+ def clear_stats(self, ports=None):
+ if ports is None:
+ ports = self.my_ports
+ self.client.clear_stats(ports=ports)
+
+ def start(self, ports=None, *args, **kwargs):
+ if ports is None:
+ ports = self.my_ports
+ self.client.start(ports=ports, *args, **kwargs)
+
+ def collect_kpi(self):
+ if not self._queue.empty():
+ kpi = self._queue.get()
+ self._result.update(kpi)
+ LOG.debug("Collect {0} KPIs {1}".format(self.RESOURCE_WORD, self._result))
+ return self._result
+
+ def _connect(self, client=None):
+ if client is None:
+ client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
+ server=self.vnfd_helper.mgmt_interface["ip"],
+ verbose_level=LoggerApi.VERBOSE_QUIET)
+
+ # try to connect with 5s intervals, 30s max
+ for idx in range(6):
+ try:
+ client.connect()
+ break
+ except STLError:
+ LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
+ time.sleep(5)
+ return client
+
+
+class Rfc2544ResourceHelper(object):
+
+ DEFAULT_CORRELATED_TRAFFIC = False
+ DEFAULT_LATENCY = False
+ DEFAULT_TOLERANCE = '0.0001 - 0.0001'
+
+ def __init__(self, scenario_helper):
+ super(Rfc2544ResourceHelper, self).__init__()
+ self.scenario_helper = scenario_helper
+ self._correlated_traffic = None
+ self.iteration = Value('i', 0)
+ self._latency = None
+ self._rfc2544 = None
+ self._tolerance_low = None
+ self._tolerance_high = None
+
+ @property
+ def rfc2544(self):
+ if self._rfc2544 is None:
+ self._rfc2544 = self.scenario_helper.all_options['rfc2544']
+ return self._rfc2544
+
+ @property
+ def tolerance_low(self):
+ if self._tolerance_low is None:
+ self.get_rfc_tolerance()
+ return self._tolerance_low
+
+ @property
+ def tolerance_high(self):
+ if self._tolerance_high is None:
+ self.get_rfc_tolerance()
+ return self._tolerance_high
+
+ @property
+ def correlated_traffic(self):
+ if self._correlated_traffic is None:
+ self._correlated_traffic = \
+ self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
+
+ return self._correlated_traffic
+
+ @property
+ def latency(self):
+ if self._latency is None:
+ self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
+ return self._latency
+
+ def get_rfc2544(self, name, default=None):
+ return self.rfc2544.get(name, default)
+
+ def get_rfc_tolerance(self):
+ tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
+ tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
+ self._tolerance_low = next(tolerance_iter)
+ self._tolerance_high = next(tolerance_iter, self.tolerance_low)
+
+
+class SampleVNFDeployHelper(object):
+
+ SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
+ REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
+ SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
+
+ def __init__(self, vnfd_helper, ssh_helper):
+ super(SampleVNFDeployHelper, self).__init__()
+ self.ssh_helper = ssh_helper
+ self.vnfd_helper = vnfd_helper
+
+ DISABLE_DEPLOY = True
+
+ def deploy_vnfs(self, app_name):
+ # temp disable for now
+ if self.DISABLE_DEPLOY:
+ return
+
+ vnf_bin = self.ssh_helper.join_bin_path(app_name)
+ exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
+ if not exit_status:
+ return
+
+ subprocess.check_output(["rm", "-rf", self.REPO_NAME])
+ subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
+ time.sleep(2)
+ self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
+ self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
+
+ build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
+ time.sleep(2)
+ http_proxy = os.environ.get('http_proxy', '')
+ https_proxy = os.environ.get('https_proxy', '')
+ cmd = "sudo -E %s --silent '%s' '%s'" % (build_script, http_proxy, https_proxy)
+ LOG.debug(cmd)
+ self.ssh_helper.execute(cmd)
+ vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
+ self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
+ self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
+
+
+class ScenarioHelper(object):
+
+ DEFAULT_VNF_CFG = {
+ 'lb_config': 'SW',
+ 'lb_count': 1,
+ 'worker_config': '1C/1T',
+ 'worker_threads': 1,
+ }
+
+ def __init__(self, name):
+ self.name = name
+ self.scenario_cfg = None
+
+ @property
+ def task_path(self):
+ return self.scenario_cfg["task_path"]
+
+ @property
+ def nodes(self):
+ return self.scenario_cfg['nodes']
+
+ @property
+ def all_options(self):
+ return self.scenario_cfg["options"]
+
+ @property
+ def options(self):
+ return self.all_options[self.name]
+
+ @property
+ def vnf_cfg(self):
+ return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
+
+ @property
+ def topology(self):
+ return self.scenario_cfg['topology']
+
+
+class SampleVNF(GenericVNF):
+ """ Class providing file-like API for generic VNF implementation """
+
+ VNF_PROMPT = "pipeline>"
+ WAIT_TIME = 1
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ super(SampleVNF, self).__init__(name, vnfd)
+ self.bin_path = get_nsb_option('bin_path', '')
+
+ self.scenario_helper = ScenarioHelper(self.name)
+ self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
+
+ if setup_env_helper_type is None:
+ setup_env_helper_type = SetupEnvHelper
+
+ self.setup_helper = setup_env_helper_type(self.vnfd_helper,
+ self.ssh_helper,
+ self.scenario_helper)
+
+ self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
+
+ if resource_helper_type is None:
+ resource_helper_type = ResourceHelper
+
+ self.resource_helper = resource_helper_type(self.setup_helper)
+
+ self.all_ports = None
+ self.context_cfg = None
+ self.nfvi_context = None
+ self.pipeline_kwargs = {}
+ self.priv_ports = None
+ self.pub_ports = None
+ # TODO(esm): make QueueFileWrapper invert-able so that we
+ # never have to manage the queues
+ self.q_in = Queue()
+ self.q_out = Queue()
+ self.queue_wrapper = None
+ self.run_kwargs = {}
+ self.scenario_cfg = None
+ self.tg_port_pairs = None
+ self.used_drivers = {}
+ self.vnf_port_pairs = None
+ self._vnf_process = None
+
+ def _get_route_data(self, route_index, route_type):
+ route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
+ for _ in range(route_index):
+ next(route_iter, '')
+ return next(route_iter, {}).get(route_type, '')
+
+ def _get_port0localip6(self):
+ return_value = self._get_route_data(0, 'network')
+ LOG.info("_get_port0localip6 : %s", return_value)
+ return return_value
+
+ def _get_port1localip6(self):
+ return_value = self._get_route_data(1, 'network')
+ LOG.info("_get_port1localip6 : %s", return_value)
+ return return_value
+
+ def _get_port0prefixlen6(self):
+ return_value = self._get_route_data(0, 'netmask')
+ LOG.info("_get_port0prefixlen6 : %s", return_value)
+ return return_value
+
+ def _get_port1prefixlen6(self):
+ return_value = self._get_route_data(1, 'netmask')
+ LOG.info("_get_port1prefixlen6 : %s", return_value)
+ return return_value
+
+ def _get_port0gateway6(self):
+ return_value = self._get_route_data(0, 'network')
+ LOG.info("_get_port0gateway6 : %s", return_value)
+ return return_value
+
+ def _get_port1gateway6(self):
+ return_value = self._get_route_data(1, 'network')
+ LOG.info("_get_port1gateway6 : %s", return_value)
+ return return_value
+
+ def _start_vnf(self):
+ self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
+ self._vnf_process = Process(target=self._run)
+ self._vnf_process.start()
+
+ def _vnf_up_post(self):
+ pass
+
+ def instantiate(self, scenario_cfg, context_cfg):
+ self.scenario_helper.scenario_cfg = scenario_cfg
+ self.context_cfg = context_cfg
+ self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
+ # self.nfvi_context = None
+
+ self.deploy_helper.deploy_vnfs(self.APP_NAME)
+ self.resource_helper.setup()
+ self._start_vnf()
+
+ def wait_for_instantiate(self):
+ buf = []
+ time.sleep(self.WAIT_TIME) # Give some time for config to load
+ while True:
+ if not self._vnf_process.is_alive():
+ raise RuntimeError("%s VNF process died." % self.APP_NAME)
+
+ # TODO(esm): move to QueueFileWrapper
+ while self.q_out.qsize() > 0:
+ buf.append(self.q_out.get())
+ message = ''.join(buf)
+ if self.VNF_PROMPT in message:
+ LOG.info("%s VNF is up and running.", self.APP_NAME)
+ self._vnf_up_post()
+ self.queue_wrapper.clear()
+ self.resource_helper.start_collect()
+ return self._vnf_process.exitcode
+
+ if "PANIC" in message:
+ raise RuntimeError("Error starting %s VNF." %
+ self.APP_NAME)
+
+ LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
+ time.sleep(1)
+
+ def _build_run_kwargs(self):
+ self.run_kwargs = {
+ 'stdin': self.queue_wrapper,
+ 'stdout': self.queue_wrapper,
+ 'keep_stdin_open': True,
+ 'pty': True,
+ }
+
+ def _build_config(self):
+ return self.setup_helper.build_config()
+
+ def _run(self):
+ # we can't share ssh paramiko objects to force new connection
+ self.ssh_helper.drop_connection()
+ cmd = self._build_config()
+ # kill before starting
+ self.ssh_helper.execute("pkill {}".format(self.APP_NAME))
+
+ LOG.debug(cmd)
+ self._build_run_kwargs()
+ self.ssh_helper.run(cmd, **self.run_kwargs)
+
+ def vnf_execute(self, cmd, wait_time=2):
+ """ send cmd to vnf process """
+
+ LOG.info("%s command: %s", self.APP_NAME, cmd)
+ self.q_in.put("{}\r\n".format(cmd))
+ time.sleep(wait_time)
+ output = []
+ while self.q_out.qsize() > 0:
+ output.append(self.q_out.get())
+ return "".join(output)
+
+ def _tear_down(self):
+ pass
+
+ def terminate(self):
+ self.vnf_execute("quit")
+ if self._vnf_process:
+ self._vnf_process.terminate()
+ self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
+ self._tear_down()
+ self.resource_helper.stop_collect()
+
+ def get_stats(self, *args, **kwargs):
+ """
+ Method for checking the statistics
+
+ :return:
+ VNF statistics
+ """
+ cmd = 'p {0} stats'.format(self.APP_WORD)
+ out = self.vnf_execute(cmd)
+ return out
+
+ def collect_kpi(self):
+ stats = self.get_stats()
+ m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
+ if m:
+ result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
+ result["collect_stats"] = self.resource_helper.collect_kpi()
+ else:
+ result = {
+ "packets_in": 0,
+ "packets_fwd": 0,
+ "packets_dropped": 0,
+ }
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
+ return result
+
+
+class SampleVNFTrafficGen(GenericTrafficGen):
+ """ Class providing file-like API for generic traffic generator """
+
+ APP_NAME = 'Sample'
+ RUN_WAIT = 1
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ super(SampleVNFTrafficGen, self).__init__(name, vnfd)
+ self.bin_path = get_nsb_option('bin_path', '')
+ self.name = "tgen__1" # name in topology file
+
+ self.scenario_helper = ScenarioHelper(self.name)
+ self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
+
+ if setup_env_helper_type is None:
+ setup_env_helper_type = SetupEnvHelper
+
+ self.setup_helper = setup_env_helper_type(self.vnfd_helper,
+ self.ssh_helper,
+ self.scenario_helper)
+
+ if resource_helper_type is None:
+ resource_helper_type = ClientResourceHelper
+
+ self.resource_helper = resource_helper_type(self.setup_helper)
+
+ self.runs_traffic = True
+ self.traffic_finished = False
+ self.tg_port_pairs = None
+ self._tg_process = None
+ self._traffic_process = None
+
+ def _start_server(self):
+ # we can't share ssh paramiko objects to force new connection
+ self.ssh_helper.drop_connection()
+
+ def instantiate(self, scenario_cfg, context_cfg):
+ self.scenario_helper.scenario_cfg = scenario_cfg
+ self.resource_helper.generate_cfg()
+ self.setup_helper.setup_vnf_environment()
+ self.resource_helper.setup()
+
+ LOG.info("Starting %s server...", self.APP_NAME)
+ self._tg_process = Process(target=self._start_server)
+ self._tg_process.start()
+
+ def wait_for_instantiate(self):
+ return self._wait_for_process()
+
+ def _check_status(self):
+ raise NotImplementedError
+
+ def _wait_for_process(self):
+ while True:
+ if not self._tg_process.is_alive():
+ raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
+ LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
+ time.sleep(1)
+ status = self._check_status()
+ if status == 0:
+ LOG.info("%s TG Server is up and running.", self.APP_NAME)
+ return self._tg_process.exitcode
+
+ def _traffic_runner(self, traffic_profile):
+ LOG.info("Starting %s client...", self.APP_NAME)
+ self.resource_helper.run_traffic(traffic_profile)
+
+ def run_traffic(self, traffic_profile):
+ """ Generate traffic on the wire according to the given params.
+ Method is non-blocking, returns immediately when traffic process
+ is running. Mandatory.
+
+ :param traffic_profile:
+ :return: True/False
+ """
+ self._traffic_process = Process(target=self._traffic_runner,
+ args=(traffic_profile,))
+ self._traffic_process.start()
+ # Wait for traffic process to start
+ while self.resource_helper.client_started.value == 0:
+ time.sleep(self.RUN_WAIT)
+
+ return self._traffic_process.is_alive()
+
+ def listen_traffic(self, traffic_profile):
+ """ Listen to traffic with the given parameters.
+ Method is non-blocking, returns immediately when traffic process
+ is running. Optional.
+
+ :param traffic_profile:
+ :return: True/False
+ """
+ pass
+
+ def verify_traffic(self, traffic_profile):
+ """ Verify captured traffic after it has ended. Optional.
+
+ :param traffic_profile:
+ :return: dict
+ """
+ pass
+
+ def collect_kpi(self):
+ result = self.resource_helper.collect_kpi()
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
+ return result
+
+ def terminate(self):
+ """ After this method finishes, all traffic processes should stop. Mandatory.
+
+ :return: True/False
+ """
+ self.traffic_finished = True
+ if self._traffic_process is not None:
+ self._traffic_process.terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ping.py b/yardstick/network_services/vnf_generic/vnf/tg_ping.py
index 000a91db4..e65296287 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_ping.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_ping.py
@@ -16,14 +16,13 @@
from __future__ import absolute_import
from __future__ import print_function
import logging
-import multiprocessing
import re
-import time
-import os
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
-from yardstick.network_services.utils import provision_tool
+from multiprocessing import Queue
+from ipaddress import IPv4Interface
+
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
LOG = logging.getLogger(__name__)
@@ -42,77 +41,59 @@ class PingParser(object):
if match:
# IMPORTANT: in order for the data to be properly taken
# in by InfluxDB, it needs to be converted to numeric types
- self.queue.put({"packets_received": float(match.group(1)),
- "rtt": float(match.group(2))})
+ self.queue.put({
+ "packets_received": float(match.group(1)),
+ "rtt": float(match.group(2)),
+ })
def close(self):
- ''' close the ssh connection '''
- pass
+ """ close the ssh connection """
+ self.closed = True
def clear(self):
- ''' clear queue till Empty '''
+ """ clear queue till Empty """
while self.queue.qsize() > 0:
self.queue.get()
-class PingTrafficGen(GenericTrafficGen):
+class PingSetupEnvHelper(DpdkVnfSetupEnvHelper):
+
+ def setup_vnf_environment(self):
+ self._bind_kernel_devices()
+
+
+class PingTrafficGen(SampleVNFTrafficGen):
"""
This traffic generator can ping a single IP with pingsize
and target given in traffic profile
"""
- def __init__(self, vnfd):
- super(PingTrafficGen, self).__init__(vnfd)
+ TG_NAME = 'Ping'
+ RUN_WAIT = 4
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if setup_env_helper_type is None:
+ setup_env_helper_type = PingSetupEnvHelper
+
+ super(PingTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
+ self._queue = Queue()
+ self._parser = PingParser(self._queue)
self._result = {}
- self._parser = None
- self._queue = None
- self._traffic_process = None
-
- mgmt_interface = vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
-
- def _bind_device_kernel(self, connection):
- dpdk_nic_bind = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "dpdk_nic_bind.py"))
-
- drivers = {intf["virtual-interface"]["vpci"]:
- intf["virtual-interface"]["driver"]
- for intf in self.vnfd["vdu"][0]["external-interface"]}
-
- commands = \
- ['"{0}" --force -b "{1}" "{2}"'.format(dpdk_nic_bind, value, key)
- for key, value in drivers.items()]
- for command in commands:
- connection.execute(command)
-
- for index, out in enumerate(self.vnfd["vdu"][0]["external-interface"]):
- vpci = out["virtual-interface"]["vpci"]
- net = "find /sys/class/net -lname '*{}*' -printf '%f'".format(vpci)
- out = connection.execute(net)[1]
- ifname = out.split('/')[-1].strip('\n')
- self.vnfd["vdu"][0]["external-interface"][index][
- "virtual-interface"]["local_iface_name"] = ifname
def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(PingTrafficGen, self).scale(flavor)
+ """ scale vnf-based on flavor input """
+ pass
- def instantiate(self, scenario_cfg, context_cfg):
- self._result = {"packets_received": 0, "rtt": 0}
- self._bind_device_kernel(self.connection)
+ def _check_status(self):
+ return self._tg_process.is_alive()
- def run_traffic(self, traffic_profile):
- self._queue = multiprocessing.Queue()
- self._parser = PingParser(self._queue)
- self._traffic_process = \
- multiprocessing.Process(target=self._traffic_runner,
- args=(traffic_profile, self._parser))
- self._traffic_process.start()
- # Wait for traffic process to start
- time.sleep(4)
- return self._traffic_process.is_alive()
+ def instantiate(self, scenario_cfg, context_cfg):
+ self._result = {
+ "packets_received": 0,
+ "rtt": 0,
+ }
+ self.setup_helper.setup_vnf_environment()
def listen_traffic(self, traffic_profile):
""" Not needed for ping
@@ -122,38 +103,26 @@ class PingTrafficGen(GenericTrafficGen):
"""
pass
- def _traffic_runner(self, traffic_profile, filewrapper):
-
- mgmt_interface = self.vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
- external_interface = self.vnfd["vdu"][0]["external-interface"]
- virtual_interface = external_interface[0]["virtual-interface"]
- target_ip = virtual_interface["dst_ip"].split('/')[0]
- local_ip = virtual_interface["local_ip"].split('/')[0]
- local_if_name = \
- virtual_interface["local_iface_name"].split('/')[0]
- packet_size = traffic_profile.params["traffic_profile"]["frame_size"]
-
- run_cmd = []
-
- run_cmd.append("ip addr flush %s" % local_if_name)
- run_cmd.append("ip addr add %s/24 dev %s" % (local_ip, local_if_name))
- run_cmd.append("ip link set %s up" % local_if_name)
-
- for cmd in run_cmd:
- self.connection.execute(cmd)
-
- ping_cmd = ("ping -s %s %s" % (packet_size, target_ip))
- self.connection.run(ping_cmd, stdout=filewrapper,
+ def _traffic_runner(self, traffic_profile):
+ intf = self.vnfd_helper.interfaces[0]["virtual-interface"]
+ profile = traffic_profile.params["traffic_profile"]
+ cmd_kwargs = {
+ 'target_ip': IPv4Interface(intf["dst_ip"]).ip.exploded,
+ 'local_ip': IPv4Interface(intf["local_ip"]).ip.exploded,
+ 'local_if_name': intf["local_iface_name"].split('/')[0],
+ 'packet_size': profile["frame_size"],
+ }
+
+ cmd_list = [
+ "sudo ip addr flush {local_if_name}",
+ "sudo ip addr add {local_ip}/24 dev {local_if_name}",
+ "sudo ip link set {local_if_name} up",
+ ]
+
+ for cmd in cmd_list:
+ self.ssh_helper.execute(cmd.format(**cmd_kwargs))
+
+ ping_cmd = "ping -s {packet_size} {target_ip}"
+ self.ssh_helper.run(ping_cmd.format(**cmd_kwargs),
+ stdout=self._parser,
keep_stdin_open=True, pty=True)
-
- def collect_kpi(self):
- if not self._queue.empty():
- kpi = self._queue.get()
- self._result.update(kpi)
- return self._result
-
- def terminate(self):
- if self._traffic_process is not None:
- self._traffic_process.terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py
index 7da4b31e9..79e42e0a8 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py
@@ -15,267 +15,98 @@
from __future__ import absolute_import
from __future__ import print_function
-import multiprocessing
import time
import logging
-import os
-import yaml
+from collections import Mapping
+from itertools import chain
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
-from yardstick.network_services.utils import get_nsb_option
-from stl.trex_stl_lib.trex_stl_client import STLClient
-from stl.trex_stl_lib.trex_stl_client import LoggerApi
-from stl.trex_stl_lib.trex_stl_exceptions import STLError
+from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
+from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper
+from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexResourceHelper
LOGGING = logging.getLogger(__name__)
-DURATION = 30
-WAIT_TIME = 3
-TREX_SYNC_PORT = 4500
-TREX_ASYNC_PORT = 4501
+class TrexRfc2544ResourceHelper(Rfc2544ResourceHelper):
-class TrexTrafficGenRFC(GenericTrafficGen):
- """
- This class handles mapping traffic profile and generating
- traffic for rfc2544 testcase.
- """
-
- def __init__(self, vnfd):
- super(TrexTrafficGenRFC, self).__init__(vnfd)
- self._result = {}
- self._terminated = multiprocessing.Value('i', 0)
- self._queue = multiprocessing.Queue()
- self._terminated = multiprocessing.Value('i', 0)
- self._traffic_process = None
- self._vpci_ascending = None
- self.tc_file_name = None
- self.client = None
- self.my_ports = None
-
- mgmt_interface = self.vnfd["mgmt-interface"]
-
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
-
- @classmethod
- def _split_mac_address_into_list(cls, mac):
- octets = mac.split(':')
- for i, elem in enumerate(octets):
- octets[i] = "0x" + str(elem)
- return octets
-
- def _generate_trex_cfg(self, vnfd):
- """
-
- :param vnfd: vnfd.yaml
- :return: trex_cfg.yaml file
- """
- trex_cfg = dict(
- port_limit=0,
- version='2',
- interfaces=[],
- port_info=list(dict(
- ))
- )
- trex_cfg["port_limit"] = len(vnfd["vdu"][0]["external-interface"])
- trex_cfg["version"] = '2'
-
- cfg_file = []
- vpci = []
- port = {}
-
- ext_intf = vnfd["vdu"][0]["external-interface"]
- for interface in ext_intf:
- virt_intf = interface["virtual-interface"]
- vpci.append(virt_intf["vpci"])
-
- port["src_mac"] = \
- self._split_mac_address_into_list(virt_intf["local_mac"])
-
- time.sleep(WAIT_TIME)
- port["dest_mac"] = \
- self._split_mac_address_into_list(virt_intf["dst_mac"])
- if virt_intf["dst_mac"]:
- trex_cfg["port_info"].append(port.copy())
-
- trex_cfg["interfaces"] = vpci
- cfg_file.append(trex_cfg)
-
- with open('/tmp/trex_cfg.yaml', 'w') as outfile:
- outfile.write(yaml.safe_dump(cfg_file, default_flow_style=False))
- self.connection.put('/tmp/trex_cfg.yaml', '/etc')
-
- self._vpci_ascending = sorted(vpci)
-
- def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(TrexTrafficGenRFC, self).scale(flavor)
-
- def instantiate(self, scenario_cfg, context_cfg):
- self._generate_trex_cfg(self.vnfd)
- self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc'])
- trex = os.path.join(self.bin_path, "trex")
- err, _, _ = \
- self.connection.execute("ls {} >/dev/null 2>&1".format(trex))
- if err != 0:
- self.connection.put(trex, trex, True)
+ def is_done(self):
+ return self.latency and self.iteration.value > 10
- LOGGING.debug("Starting TRex server...")
- _tg_server = \
- multiprocessing.Process(target=self._start_server)
- _tg_server.start()
- while True:
- LOGGING.info("Waiting for TG Server to start.. ")
- time.sleep(WAIT_TIME)
- status = \
- self.connection.execute("lsof -i:%s" % TREX_SYNC_PORT)[0]
- if status == 0:
- LOGGING.info("TG server is up and running.")
- return _tg_server.exitcode
- if not _tg_server.is_alive():
- raise RuntimeError("Traffic Generator process died.")
+class TrexRfcResourceHelper(TrexResourceHelper):
- def listen_traffic(self, traffic_profile):
- pass
+ LATENCY_TIME_SLEEP = 120
+ RUN_DURATION = 30
+ WAIT_TIME = 3
- def _get_logical_if_name(self, vpci):
- ext_intf = self.vnfd["vdu"][0]["external-interface"]
- for interface in range(len(self.vnfd["vdu"][0]["external-interface"])):
- virtual_intf = ext_intf[interface]["virtual-interface"]
- if virtual_intf["vpci"] == vpci:
- return ext_intf[interface]["name"]
+ def __init__(self, setup_helper, rfc_helper_type=None):
+ super(TrexRfcResourceHelper, self).__init__(setup_helper)
- def run_traffic(self, traffic_profile,
- client_started=multiprocessing.Value('i', 0)):
+ if rfc_helper_type is None:
+ rfc_helper_type = TrexRfc2544ResourceHelper
- self._traffic_process = \
- multiprocessing.Process(target=self._traffic_runner,
- args=(traffic_profile, self._queue,
- client_started, self._terminated))
- self._traffic_process.start()
- # Wait for traffic process to start
- while client_started.value == 0:
- time.sleep(1)
+ self.rfc2544_helper = rfc_helper_type(self.scenario_helper)
+ # self.tg_port_pairs = []
- return self._traffic_process.is_alive()
+ def _build_ports(self):
+ self.tg_port_pairs, self.networks = MultiPortConfig.get_port_pairs(
+ self.vnfd_helper.interfaces)
+ self.priv_ports = [int(x[0][-1]) for x in self.tg_port_pairs]
+ self.pub_ports = [int(x[1][-1]) for x in self.tg_port_pairs]
+ self.my_ports = list(set(chain(self.priv_ports, self.pub_ports)))
- def _start_server(self):
- mgmt_interface = self.vnfd["mgmt-interface"]
-
- _server = ssh.SSH.from_node(mgmt_interface)
- _server.wait()
-
- _server.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- (TREX_SYNC_PORT, TREX_ASYNC_PORT))
- _server.execute("pkill -9 rex > /dev/null 2>&1")
-
- trex_path = os.path.join(self.bin_path, "trex/scripts")
- path = get_nsb_option("trex_path", trex_path)
- trex_cmd = "cd " + path + "; sudo ./t-rex-64 -i > /dev/null 2>&1"
-
- _server.execute(trex_cmd)
-
- def _connect_client(self, client=None):
- if client is None:
- client = STLClient(username=self.vnfd["mgmt-interface"]["user"],
- server=self.vnfd["mgmt-interface"]["ip"],
- verbose_level=LoggerApi.VERBOSE_QUIET)
- for idx in range(6):
- try:
- client.connect()
- break
- except STLError:
- LOGGING.info("Unable to connect to Trex. Attempt %s", idx)
- time.sleep(WAIT_TIME)
- return client
-
- @classmethod
- def _get_rfc_tolerance(cls, tc_yaml):
- tolerance = '0.8 - 1.0'
- if 'tc_options' in tc_yaml['scenarios'][0]:
- tc_options = tc_yaml['scenarios'][0]['tc_options']
- if 'rfc2544' in tc_options:
- tolerance = \
- tc_options['rfc2544'].get('allowed_drop_rate', '0.8 - 1.0')
-
- tolerance = tolerance.split('-')
- min_tol = float(tolerance[0])
- if len(tolerance) == 2:
- max_tol = float(tolerance[1])
- else:
- max_tol = float(tolerance[0])
-
- return [min_tol, max_tol]
-
- def _traffic_runner(self, traffic_profile, queue,
- client_started, terminated):
- LOGGING.info("Starting TRex client...")
- tc_yaml = {}
-
- with open(self.tc_file_name) as tc_file:
- tc_yaml = yaml.load(tc_file.read())
+ def _run_traffic_once(self, traffic_profile):
+ traffic_profile.execute(self)
+ self.client_started.value = 1
+ time.sleep(self.RUN_DURATION)
+ self.client.stop(self.my_ports)
+ time.sleep(self.WAIT_TIME)
+ samples = traffic_profile.get_drop_percentage(self)
+ self._queue.put(samples)
- tolerance = self._get_rfc_tolerance(tc_yaml)
+ if not self.rfc2544_helper.is_done():
+ return
- # fixme: fix passing correct trex config file,
- # instead of searching the default path
- self.my_ports = [0, 1]
- self.client = self._connect_client()
+ self.client.stop(self.my_ports)
self.client.reset(ports=self.my_ports)
- self.client.remove_all_streams(self.my_ports) # remove all streams
- while not terminated.value:
- traffic_profile.execute(self)
- client_started.value = 1
- time.sleep(DURATION)
+ self.client.remove_all_streams(self.my_ports)
+ traffic_profile.execute_latency(samples=samples)
+ multiplier = traffic_profile.calculate_pps(samples)[1]
+ for _ in range(5):
+ time.sleep(self.LATENCY_TIME_SLEEP)
self.client.stop(self.my_ports)
- time.sleep(WAIT_TIME)
+ time.sleep(self.WAIT_TIME)
last_res = self.client.get_stats(self.my_ports)
- samples = {}
- for vpci_idx in range(len(self._vpci_ascending)):
- name = \
- self._get_logical_if_name(self._vpci_ascending[vpci_idx])
- # fixme: VNFDs KPIs values needs to be mapped to TRex structure
- if not isinstance(last_res, dict):
- terminated.value = 1
- last_res = {}
+ if not isinstance(last_res, Mapping):
+ self._terminated.value = 1
+ continue
+ self.generate_samples('latency', {})
+ self._queue.put(samples)
+ self.client.start(mult=str(multiplier),
+ ports=self.my_ports,
+ duration=120, force=True)
- samples[name] = \
- {"rx_throughput_fps":
- float(last_res.get(vpci_idx, {}).get("rx_pps", 0.0)),
- "tx_throughput_fps":
- float(last_res.get(vpci_idx, {}).get("tx_pps", 0.0)),
- "rx_throughput_mbps":
- float(last_res.get(vpci_idx, {}).get("rx_bps", 0.0)),
- "tx_throughput_mbps":
- float(last_res.get(vpci_idx, {}).get("tx_bps", 0.0)),
- "in_packets":
- last_res.get(vpci_idx, {}).get("ipackets", 0),
- "out_packets":
- last_res.get(vpci_idx, {}).get("opackets", 0)}
+ def start_client(self, mult, duration, force=True):
+ self.client.start(ports=self.my_ports, mult=mult, duration=duration, force=force)
- samples = \
- traffic_profile.get_drop_percentage(self, samples,
- tolerance[0], tolerance[1])
- queue.put(samples)
- self.client.stop(self.my_ports)
- self.client.disconnect()
- queue.put(samples)
+ def clear_client_stats(self):
+ self.client.clear_stats(ports=self.my_ports)
def collect_kpi(self):
- if not self._queue.empty():
- result = self._queue.get()
- self._result.update(result)
- LOGGING.debug("trex collect Kpis %s", self._result)
- return self._result
+ self.rfc2544_helper.iteration.value += 1
+ super(TrexRfcResourceHelper, self).collect_kpi()
+
- def terminate(self):
- self._terminated.value = 1 # stop Trex clinet
+class TrexTrafficGenRFC(TrexTrafficGen):
+ """
+ This class handles mapping traffic profile and generating
+ traffic for rfc2544 testcase.
+ """
- self.connection.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- (TREX_SYNC_PORT, TREX_ASYNC_PORT))
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if resource_helper_type is None:
+ resource_helper_type = TrexRfcResourceHelper
- if self._traffic_process:
- self._traffic_process.terminate()
+ super(TrexTrafficGenRFC, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_trex.py
index 058b715fe..616b331ba 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_trex.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_trex.py
@@ -15,261 +15,136 @@
from __future__ import absolute_import
from __future__ import print_function
-import multiprocessing
-import time
+
import logging
import os
+
import yaml
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
+from yardstick.common.utils import mac_address_to_hex_list
from yardstick.network_services.utils import get_nsb_option
-from yardstick.network_services.utils import provision_tool
-from stl.trex_stl_lib.trex_stl_client import STLClient
-from stl.trex_stl_lib.trex_stl_client import LoggerApi
-from stl.trex_stl_lib.trex_stl_exceptions import STLError
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
LOG = logging.getLogger(__name__)
-DURATION = 30
-WAIT_QUEUE = 1
-TREX_SYNC_PORT = 4500
-TREX_ASYNC_PORT = 4501
-class TrexTrafficGen(GenericTrafficGen):
+class TrexResourceHelper(ClientResourceHelper):
+
+ CONF_FILE = '/tmp/trex_cfg.yaml'
+ QUEUE_WAIT_TIME = 1
+ RESOURCE_WORD = 'trex'
+ RUN_DURATION = 0
+
+ SYNC_PORT = 4500
+ ASYNC_PORT = 4501
+
+ def generate_cfg(self):
+ ext_intf = self.vnfd_helper.interfaces
+ vpci_list = []
+ port_list = []
+ trex_cfg = {
+ 'port_limit': 0,
+ 'version': '2',
+ 'interfaces': vpci_list,
+ 'port_info': port_list,
+ "port_limit": len(ext_intf),
+ "version": '2',
+ }
+ cfg_file = [trex_cfg]
+
+ for interface in ext_intf:
+ virtual_interface = interface['virtual-interface']
+ vpci_list.append(virtual_interface["vpci"])
+ dst_mac = virtual_interface["dst_mac"]
+
+ if not dst_mac:
+ continue
+
+ local_mac = virtual_interface["local_mac"]
+ port_list.append({
+ "src_mac": mac_address_to_hex_list(local_mac),
+ "dest_mac": mac_address_to_hex_list(dst_mac),
+ })
+
+ cfg_str = yaml.safe_dump(cfg_file, default_flow_style=False, explicit_start=True)
+ self.ssh_helper.upload_config_file(os.path.basename(self.CONF_FILE), cfg_str)
+ self._vpci_ascending = sorted(vpci_list)
+
+ def check_status(self):
+ status, _, _ = self.ssh_helper.execute("sudo lsof -i:%s" % self.SYNC_PORT)
+ return status
+
+ # temp disable
+ DISABLE_DEPLOY = True
+
+ def setup(self):
+ if self.DISABLE_DEPLOY:
+ return
+
+ trex_path = self.ssh_helper.join_bin_path('trex')
+
+ err = self.ssh_helper.execute("which {}".format(trex_path))[0]
+ if err == 0:
+ return
+
+ LOG.info("Copying %s to destination...", self.RESOURCE_WORD)
+ self.ssh_helper.run("sudo mkdir -p '{}'".format(os.path.dirname(trex_path)))
+ self.ssh_helper.put("~/.bash_profile", "~/.bash_profile")
+ self.ssh_helper.put(trex_path, trex_path, True)
+ ko_src = os.path.join(trex_path, "scripts/ko/src/")
+ self.ssh_helper.execute(self.MAKE_INSTALL.format(ko_src))
+
+ def start(self, ports=None, *args, **kwargs):
+ cmd = "sudo fuser -n tcp {0.SYNC_PORT} {0.ASYNC_PORT} -k > /dev/null 2>&1"
+ self.ssh_helper.execute(cmd.format(self))
+
+ self.ssh_helper.execute("sudo pkill -9 rex > /dev/null 2>&1")
+
+ trex_path = self.ssh_helper.join_bin_path("trex", "scripts")
+ path = get_nsb_option("trex_path", trex_path)
+
+ # cmd = "sudo ./t-rex-64 -i --cfg %s > /dev/null 2>&1" % self.CONF_FILE
+ cmd = "./t-rex-64 -i --cfg '{}'".format(self.CONF_FILE)
+
+ # if there are errors we want to see them
+ # we have to sudo cd because the path might be owned by root
+ trex_cmd = """sudo bash -c "cd '{}' ; {}" >/dev/null""".format(path, cmd)
+ self.ssh_helper.execute(trex_cmd)
+
+ def terminate(self):
+ super(TrexResourceHelper, self).terminate()
+ cmd = "sudo fuser -n tcp %s %s -k > /dev/null 2>&1"
+ self.ssh_helper.execute(cmd % (self.SYNC_PORT, self.ASYNC_PORT))
+
+
+class TrexTrafficGen(SampleVNFTrafficGen):
"""
This class handles mapping traffic profile and generating
traffic for given testcase
"""
- def __init__(self, vnfd):
- super(TrexTrafficGen, self).__init__(vnfd)
- self._result = {}
- self._queue = multiprocessing.Queue()
- self._terminated = multiprocessing.Value('i', 0)
- self._traffic_process = None
- self._vpci_ascending = None
- self.client = None
- self.my_ports = None
- self.client_started = multiprocessing.Value('i', 0)
-
- mgmt_interface = vnfd["mgmt-interface"]
-
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
-
- @classmethod
- def _split_mac_address_into_list(cls, mac):
- octets = mac.split(':')
- for i, elem in enumerate(octets):
- octets[i] = "0x" + str(elem)
- return octets
-
- def _generate_trex_cfg(self, vnfd):
- """
-
- :param vnfd: vnfd.yaml
- :return: trex_cfg.yaml file
- """
- trex_cfg = dict(
- port_limit=0,
- version='2',
- interfaces=[],
- port_info=list(dict(
- ))
- )
- trex_cfg["port_limit"] = len(vnfd["vdu"][0]["external-interface"])
- trex_cfg["version"] = '2'
-
- cfg_file = []
- vpci = []
- port = {}
-
- for interface in range(len(vnfd["vdu"][0]["external-interface"])):
- ext_intrf = vnfd["vdu"][0]["external-interface"]
- virtual_interface = ext_intrf[interface]["virtual-interface"]
- vpci.append(virtual_interface["vpci"])
-
- port["src_mac"] = self._split_mac_address_into_list(
- virtual_interface["local_mac"])
- port["dest_mac"] = self._split_mac_address_into_list(
- virtual_interface["dst_mac"])
-
- trex_cfg["port_info"].append(port.copy())
-
- trex_cfg["interfaces"] = vpci
- cfg_file.append(trex_cfg)
-
- with open('/tmp/trex_cfg.yaml', 'w') as outfile:
- outfile.write(yaml.safe_dump(cfg_file, default_flow_style=False))
- self.connection.put('/tmp/trex_cfg.yaml', '/etc')
-
- self._vpci_ascending = sorted(vpci)
-
- @classmethod
- def __setup_hugepages(cls, connection):
- hugepages = \
- connection.execute(
- "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1]
- hugepages = hugepages.rstrip()
-
- memory_path = \
- '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
- connection.execute("awk -F: '{ print $1 }' < %s" % memory_path)
-
- pages = 16384 if hugepages.rstrip() == "2048kB" else 16
- connection.execute("echo %s > %s" % (pages, memory_path))
-
- def setup_vnf_environment(self, connection):
- ''' setup dpdk environment needed for vnf to run '''
-
- self.__setup_hugepages(connection)
- connection.execute("modprobe uio && modprobe igb_uio")
-
- exit_status = connection.execute("lsmod | grep -i igb_uio")[0]
- if exit_status == 0:
- return
+ APP_NAME = 'TRex'
- dpdk = os.path.join(self.bin_path, "dpdk-16.07")
- dpdk_setup = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "nsb_setup.sh"))
- status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0]
- if status:
- connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if resource_helper_type is None:
+ resource_helper_type = TrexResourceHelper
- def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(TrexTrafficGen, self).scale(flavor)
-
- def instantiate(self, scenario_cfg, context_cfg):
- self._generate_trex_cfg(self.vnfd)
- self.setup_vnf_environment(self.connection)
-
- trex = os.path.join(self.bin_path, "trex")
- err = \
- self.connection.execute("ls {} >/dev/null 2>&1".format(trex))[0]
- if err != 0:
- LOG.info("Copying trex to destination...")
- self.connection.put("/root/.bash_profile", "/root/.bash_profile")
- self.connection.put(trex, trex, True)
- ko_src = os.path.join(trex, "scripts/ko/src/")
- self.connection.execute("cd %s && make && make install" % ko_src)
-
- LOG.info("Starting TRex server...")
- _tg_process = \
- multiprocessing.Process(target=self._start_server)
- _tg_process.start()
- while True:
- if not _tg_process.is_alive():
- raise RuntimeError("Traffic Generator process died.")
- LOG.info("Waiting for TG Server to start.. ")
- time.sleep(1)
- status = \
- self.connection.execute("lsof -i:%s" % TREX_SYNC_PORT)[0]
- if status == 0:
- LOG.info("TG server is up and running.")
- return _tg_process.exitcode
-
- def listen_traffic(self, traffic_profile):
- pass
+ super(TrexTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
- def _get_logical_if_name(self, vpci):
- ext_intf = self.vnfd["vdu"][0]["external-interface"]
- for interface in range(len(self.vnfd["vdu"][0]["external-interface"])):
- virtual_intf = ext_intf[interface]["virtual-interface"]
- if virtual_intf["vpci"] == vpci:
- return ext_intf[interface]["name"]
-
- def run_traffic(self, traffic_profile):
- self._traffic_process = \
- multiprocessing.Process(target=self._traffic_runner,
- args=(traffic_profile, self._queue,
- self.client_started,
- self._terminated))
- self._traffic_process.start()
- # Wait for traffic process to start
- while self.client_started.value == 0:
- time.sleep(1)
-
- return self._traffic_process.is_alive()
+ def _check_status(self):
+ return self.resource_helper.check_status()
def _start_server(self):
- mgmt_interface = self.vnfd["mgmt-interface"]
-
- _server = ssh.SSH.from_node(mgmt_interface)
- _server.wait()
+ super(TrexTrafficGen, self)._start_server()
+ self.resource_helper.start()
- _server.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- (TREX_SYNC_PORT, TREX_ASYNC_PORT))
+ def scale(self, flavor=""):
+ pass
- trex_path = os.path.join(self.bin_path, "trex/scripts")
- path = get_nsb_option("trex_path", trex_path)
- trex_cmd = "cd " + path + "; sudo ./t-rex-64 -i > /dev/null 2>&1"
-
- _server.execute(trex_cmd)
-
- def _connect_client(self, client=None):
- if client is None:
- client = STLClient(username=self.vnfd["mgmt-interface"]["user"],
- server=self.vnfd["mgmt-interface"]["ip"],
- verbose_level=LoggerApi.VERBOSE_QUIET)
- # try to connect with 5s intervals, 30s max
- for idx in range(6):
- try:
- client.connect()
- break
- except STLError:
- LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
- time.sleep(5)
- return client
-
- def _traffic_runner(self, traffic_profile, queue,
- client_started, terminated):
- LOG.info("Starting TRex client...")
-
- self.my_ports = [0, 1]
- self.client = self._connect_client()
- self.client.reset(ports=self.my_ports)
-
- self.client.remove_all_streams(self.my_ports) # remove all streams
-
- while not terminated.value:
- traffic_profile.execute(self)
- client_started.value = 1
- last_res = self.client.get_stats(self.my_ports)
- if not isinstance(last_res, dict): # added for mock unit test
- terminated.value = 1
- last_res = {}
-
- samples = {}
- for vpci_idx in range(len(self._vpci_ascending)):
- name = \
- self._get_logical_if_name(self._vpci_ascending[vpci_idx])
- # fixme: VNFDs KPIs values needs to be mapped to TRex structure
- xe_value = last_res.get(vpci_idx, {})
- samples[name] = \
- {"rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
- "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
- "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
- "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
- "in_packets": xe_value.get("ipackets", 0),
- "out_packets": xe_value.get("opackets", 0)}
- time.sleep(WAIT_QUEUE)
- queue.put(samples)
-
- self.client.disconnect()
- terminated.value = 0
-
- def collect_kpi(self):
- if not self._queue.empty():
- self._result.update(self._queue.get())
- LOG.debug("trex collect Kpis %s", self._result)
- return self._result
+ def listen_traffic(self, traffic_profile):
+ pass
def terminate(self):
- self.connection.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- (TREX_SYNC_PORT, TREX_ASYNC_PORT))
- self.traffic_finished = True
- if self._traffic_process:
- self._traffic_process.terminate()
+ self.resource_helper.terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
index e9e80bdfb..310ab67cb 100644
--- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
+++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
@@ -15,313 +15,268 @@
from __future__ import absolute_import
from __future__ import print_function
-import tempfile
-import time
import os
import logging
import re
-from multiprocessing import Queue
-import multiprocessing
-import ipaddress
-import six
+import posixpath
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
-from yardstick.network_services.utils import provision_tool
-from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
-from yardstick.network_services.nfvi.resource import ResourceProfile
+from six.moves import configparser, zip
-LOG = logging.getLogger(__name__)
-VPE_PIPELINE_COMMAND = '{tool_path} -p 0x3 -f {cfg_file} -s {script}'
-CORES = ['0', '1', '2']
-WAIT_TIME = 20
+from yardstick.network_services.pipeline import PipelineRules
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper
+LOG = logging.getLogger(__name__)
-class VpeApproxVnf(GenericVNF):
+VPE_PIPELINE_COMMAND = """sudo {tool_path} -p {ports_len_hex} -f {cfg_file} -s {script}"""
+
+VPE_COLLECT_KPI = """\
+Pkts in:\s(\d+)\r\n\
+\tPkts dropped by Pkts in:\s(\d+)\r\n\
+\tPkts dropped by AH:\s(\d+)\r\n\\
+\tPkts dropped by other:\s(\d+)\
+"""
+
+
+class ConfigCreate(object):
+
+ @staticmethod
+ def vpe_tmq(config, index):
+ tm_q = 'TM{0}'.format(index)
+ config.add_section(tm_q)
+ config.set(tm_q, 'burst_read', '24')
+ config.set(tm_q, 'burst_write', '32')
+ config.set(tm_q, 'cfg', '/tmp/full_tm_profile_10G.cfg')
+ return config
+
+ def __init__(self, priv_ports, pub_ports, socket):
+ super(ConfigCreate, self).__init__()
+ self.sw_q = -1
+ self.sink_q = -1
+ self.n_pipeline = 1
+ self.priv_ports = priv_ports
+ self.pub_ports = pub_ports
+ self.pipeline_per_port = 9
+ self.socket = socket
+
+ def vpe_initialize(self, config):
+ config.add_section('EAL')
+ config.set('EAL', 'log_level', '0')
+
+ config.add_section('PIPELINE0')
+ config.set('PIPELINE0', 'type', 'MASTER')
+ config.set('PIPELINE0', 'core', 's%sC0' % self.socket)
+
+ config.add_section('MEMPOOL0')
+ config.set('MEMPOOL0', 'pool_size', '256K')
+
+ config.add_section('MEMPOOL1')
+ config.set('MEMPOOL1', 'pool_size', '2M')
+ return config
+
+ def vpe_rxq(self, config):
+ for port in self.pub_ports:
+ new_section = 'RXQ{0}.0'.format(port)
+ config.add_section(new_section)
+ config.set(new_section, 'mempool', 'MEMPOOL1')
+
+ return config
+
+ def get_sink_swq(self, parser, pipeline, k, index):
+ sink = ""
+ pktq = parser.get(pipeline, k)
+ if "SINK" in pktq:
+ self.sink_q += 1
+ sink = " SINK{0}".format(self.sink_q)
+ if "TM" in pktq:
+ sink = " TM{0}".format(index)
+ pktq = "SWQ{0}{1}".format(self.sw_q, sink)
+ return pktq
+
+ def vpe_upstream(self, vnf_cfg, intf):
+ parser = configparser.ConfigParser()
+ parser.read(os.path.join(vnf_cfg, 'vpe_upstream'))
+ for pipeline in parser.sections():
+ for k, v in parser.items(pipeline):
+ if k == "pktq_in":
+ index = intf['index']
+ if "RXQ" in v:
+ value = "RXQ{0}.0".format(index)
+ else:
+ value = self.get_sink_swq(parser, pipeline, k, index)
+
+ parser.set(pipeline, k, value)
+
+ elif k == "pktq_out":
+ index = intf['peer_intf']['index']
+ if "TXQ" in v:
+ value = "TXQ{0}.0".format(index)
+ else:
+ self.sw_q += 1
+ value = self.get_sink_swq(parser, pipeline, k, index)
+
+ parser.set(pipeline, k, value)
+
+ new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline)
+ if new_pipeline != pipeline:
+ parser._sections[new_pipeline] = parser._sections[pipeline]
+ parser._sections.pop(pipeline)
+ self.n_pipeline += 1
+ return parser
+
+ def vpe_downstream(self, vnf_cfg, intf):
+ parser = configparser.ConfigParser()
+ parser.read(os.path.join(vnf_cfg, 'vpe_downstream'))
+ for pipeline in parser.sections():
+ for k, v in parser.items(pipeline):
+ index = intf['dpdk_port_num']
+ peer_index = intf['peer_intf']['dpdk_port_num']
+
+ if k == "pktq_in":
+ if "RXQ" not in v:
+ value = self.get_sink_swq(parser, pipeline, k, index)
+ elif "TM" in v:
+ value = "RXQ{0}.0 TM{1}".format(peer_index, index)
+ else:
+ value = "RXQ{0}.0".format(peer_index)
+
+ parser.set(pipeline, k, value)
+
+ if k == "pktq_out":
+ if "TXQ" not in v:
+ self.sw_q += 1
+ value = self.get_sink_swq(parser, pipeline, k, index)
+ elif "TM" in v:
+ value = "TXQ{0}.0 TM{1}".format(peer_index, index)
+ else:
+ value = "TXQ{0}.0".format(peer_index)
+
+ parser.set(pipeline, k, value)
+
+ new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline)
+ if new_pipeline != pipeline:
+ parser._sections[new_pipeline] = parser._sections[pipeline]
+ parser._sections.pop(pipeline)
+ self.n_pipeline += 1
+ return parser
+
+ def create_vpe_config(self, vnf_cfg):
+ config = configparser.ConfigParser()
+ vpe_cfg = os.path.join("/tmp/vpe_config")
+ with open(vpe_cfg, 'w') as cfg_file:
+ config = self.vpe_initialize(config)
+ config = self.vpe_rxq(config)
+ config.write(cfg_file)
+ for index, priv_port in enumerate(self.priv_ports):
+ config = self.vpe_upstream(vnf_cfg, priv_port)
+ config.write(cfg_file)
+ config = self.vpe_downstream(vnf_cfg, priv_port)
+ config = self.vpe_tmq(config, index)
+ config.write(cfg_file)
+
+ def generate_vpe_script(self, interfaces):
+ rules = PipelineRules(pipeline_id=1)
+ for priv_port, pub_port in zip(self.priv_ports, self.pub_ports):
+ priv_intf = interfaces[priv_port]["virtual-interface"]
+ pub_intf = interfaces[pub_port]["virtual-interface"]
+
+ dst_port0_ip = priv_intf["dst_ip"]
+ dst_port1_ip = pub_intf["dst_ip"]
+ dst_port0_mac = priv_intf["dst_mac"]
+ dst_port1_mac = pub_intf["dst_mac"]
+
+ rules.add_firewall_script(dst_port0_ip)
+ rules.next_pipeline()
+ rules.add_flow_classification_script()
+ rules.next_pipeline()
+ rules.add_flow_action()
+ rules.next_pipeline()
+ rules.add_flow_action2()
+ rules.next_pipeline()
+ rules.add_route_script(dst_port1_ip, dst_port1_mac)
+ rules.next_pipeline()
+ rules.add_route_script2(dst_port0_ip, dst_port0_mac)
+ rules.next_pipeline(num=4)
+
+ return rules.get_string()
+
+
+class VpeApproxSetupEnvHelper(DpdkVnfSetupEnvHelper):
+
+ CFG_CONFIG = "/tmp/vpe_config"
+ CFG_SCRIPT = "/tmp/vpe_script"
+ CORES = ['0', '1', '2', '3', '4', '5']
+ PIPELINE_COMMAND = VPE_PIPELINE_COMMAND
+
+ def build_config(self):
+ vpe_vars = {
+ "bin_path": self.ssh_helper.bin_path,
+ "socket": self.socket,
+ }
+
+ all_ports = []
+ priv_ports = []
+ pub_ports = []
+ for interface in self.vnfd_helper.interfaces:
+ all_ports.append(interface['name'])
+ vld_id = interface['virtual-interface']['vld_id']
+ if vld_id.startswith('private'):
+ priv_ports.append(interface)
+ elif vld_id.startswith('public'):
+ pub_ports.append(interface)
+
+ vpe_conf = ConfigCreate(priv_ports, pub_ports, self.socket)
+ vpe_conf.create_vpe_config(self.scenario_helper.vnf_cfg)
+
+ config_basename = posixpath.basename(self.CFG_CONFIG)
+ script_basename = posixpath.basename(self.CFG_SCRIPT)
+ with open(self.CFG_CONFIG) as handle:
+ vpe_config = handle.read()
+
+ self.ssh_helper.upload_config_file(config_basename, vpe_config.format(**vpe_vars))
+
+ vpe_script = vpe_conf.generate_vpe_script(self.vnfd_helper.interfaces)
+ self.ssh_helper.upload_config_file(script_basename, vpe_script.format(**vpe_vars))
+
+
+class VpeApproxVnf(SampleVNF):
""" This class handles vPE VNF model-driver definitions """
- def __init__(self, vnfd):
- super(VpeApproxVnf, self).__init__(vnfd)
- self.socket = None
- self.q_in = Queue()
- self.q_out = Queue()
- self.vnf_cfg = None
- self._vnf_process = None
- self.connection = None
- self.resource = None
-
- def _resource_collect_start(self):
- self.resource.initiate_systemagent(self.bin_path)
- self.resource.start()
+ APP_NAME = 'vPE_vnf'
+ APP_WORD = 'vpe'
+ COLLECT_KPI = VPE_COLLECT_KPI
+ WAIT_TIME = 20
- def _resource_collect_stop(self):
- self.resource.stop()
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if setup_env_helper_type is None:
+ setup_env_helper_type = VpeApproxSetupEnvHelper
- def _collect_resource_kpi(self):
- result = {}
+ super(VpeApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type)
- status = self.resource.check_if_sa_running("collectd")[0]
- if status:
- result = self.resource.amqp_collect_nfvi_kpi()
-
- result = {"core": result}
-
- return result
-
- @classmethod
- def __setup_hugepages(cls, connection):
- hugepages = \
- connection.execute(
- "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1]
- hugepages = hugepages.rstrip()
-
- memory_path = \
- '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
- connection.execute("awk -F: '{ print $1 }' < %s" % memory_path)
-
- pages = 16384 if hugepages.rstrip() == "2048kB" else 16
- connection.execute("echo %s > %s" % (pages, memory_path))
-
- def setup_vnf_environment(self, connection):
- ''' setup dpdk environment needed for vnf to run '''
-
- self.__setup_hugepages(connection)
- connection.execute("modprobe uio && modprobe igb_uio")
-
- exit_status = connection.execute("lsmod | grep -i igb_uio")[0]
- if exit_status == 0:
- return
-
- dpdk = os.path.join(self.bin_path, "dpdk-16.07")
- dpdk_setup = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "nsb_setup.sh"))
- status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0]
- if status:
- connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
-
- def _get_cpu_sibling_list(self):
- cpu_topo = []
- for core in CORES:
- sys_cmd = \
- "/sys/devices/system/cpu/cpu%s/topology/thread_siblings_list" \
- % core
- cpuid = \
- self.connection.execute("awk -F: '{ print $1 }' < %s" %
- sys_cmd)[1]
- cpu_topo += \
- [(idx) if idx.isdigit() else idx for idx in cpuid.split(',')]
-
- return [cpu.strip() for cpu in cpu_topo]
-
- def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(VpeApproxVnf, self).scale(flavor)
-
- def instantiate(self, scenario_cfg, context_cfg):
- vnf_cfg = scenario_cfg['vnf_options']['vpe']['cfg']
-
- mgmt_interface = self.vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
-
- self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc'])
-
- self.setup_vnf_environment(self.connection)
-
- cores = self._get_cpu_sibling_list()
- self.resource = ResourceProfile(self.vnfd, cores)
-
- self.connection.execute("pkill vPE_vnf")
- dpdk_nic_bind = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "dpdk_nic_bind.py"))
-
- interfaces = self.vnfd["vdu"][0]['external-interface']
- self.socket = \
- next((0 for v in interfaces
- if v['virtual-interface']["vpci"][5] == "0"), 1)
-
- bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
- for vpci in bound_pci:
- self.connection.execute(
- "%s --force -b igb_uio %s" % (dpdk_nic_bind, vpci))
- queue_wrapper = \
- QueueFileWrapper(self.q_in, self.q_out, "pipeline>")
- self._vnf_process = multiprocessing.Process(target=self._run_vpe,
- args=(queue_wrapper,
- vnf_cfg,))
- self._vnf_process.start()
- buf = []
- time.sleep(WAIT_TIME) # Give some time for config to load
- while True:
- message = ''
- while self.q_out.qsize() > 0:
- buf.append(self.q_out.get())
- message = ''.join(buf)
- if "pipeline>" in message:
- LOG.info("VPE VNF is up and running.")
- queue_wrapper.clear()
- self._resource_collect_start()
- return self._vnf_process.exitcode
- if "PANIC" in message:
- raise RuntimeError("Error starting vPE VNF.")
-
- LOG.info("Waiting for VNF to start.. ")
- time.sleep(3)
- if not self._vnf_process.is_alive():
- raise RuntimeError("vPE VNF process died.")
-
- def _get_ports_gateway(self, name):
- if 'routing_table' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['routing_table']
-
- for route in routing_table:
- if name == route['if']:
- return route['gateway']
-
- def terminate(self):
- self.execute_command("quit")
- if self._vnf_process:
- self._vnf_process.terminate()
-
- def _run_vpe(self, filewrapper, vnf_cfg):
- mgmt_interface = self.vnfd["mgmt-interface"]
-
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
-
- interfaces = self.vnfd["vdu"][0]['external-interface']
- port0_ip = ipaddress.ip_interface(six.text_type(
- "%s/%s" % (interfaces[0]["virtual-interface"]["local_ip"],
- interfaces[0]["virtual-interface"]["netmask"])))
- port1_ip = ipaddress.ip_interface(six.text_type(
- "%s/%s" % (interfaces[1]["virtual-interface"]["local_ip"],
- interfaces[1]["virtual-interface"]["netmask"])))
- dst_port0_ip = ipaddress.ip_interface(
- u"%s/%s" % (interfaces[0]["virtual-interface"]["dst_ip"],
- interfaces[0]["virtual-interface"]["netmask"]))
- dst_port1_ip = ipaddress.ip_interface(
- u"%s/%s" % (interfaces[1]["virtual-interface"]["dst_ip"],
- interfaces[1]["virtual-interface"]["netmask"]))
-
- vpe_vars = {"port0_local_ip": port0_ip.ip.exploded,
- "port0_dst_ip": dst_port0_ip.ip.exploded,
- "port0_local_ip_hex":
- self._ip_to_hex(port0_ip.ip.exploded),
- "port0_prefixlen": port0_ip.network.prefixlen,
- "port0_netmask": port0_ip.network.netmask.exploded,
- "port0_netmask_hex":
- self._ip_to_hex(port0_ip.network.netmask.exploded),
- "port0_local_mac":
- interfaces[0]["virtual-interface"]["local_mac"],
- "port0_dst_mac":
- interfaces[0]["virtual-interface"]["dst_mac"],
- "port0_gateway":
- self._get_ports_gateway(interfaces[0]["name"]),
- "port0_local_network":
- port0_ip.network.network_address.exploded,
- "port0_prefix": port0_ip.network.prefixlen,
- "port1_local_ip": port1_ip.ip.exploded,
- "port1_dst_ip": dst_port1_ip.ip.exploded,
- "port1_local_ip_hex":
- self._ip_to_hex(port1_ip.ip.exploded),
- "port1_prefixlen": port1_ip.network.prefixlen,
- "port1_netmask": port1_ip.network.netmask.exploded,
- "port1_netmask_hex":
- self._ip_to_hex(port1_ip.network.netmask.exploded),
- "port1_local_mac":
- interfaces[1]["virtual-interface"]["local_mac"],
- "port1_dst_mac":
- interfaces[1]["virtual-interface"]["dst_mac"],
- "port1_gateway":
- self._get_ports_gateway(interfaces[1]["name"]),
- "port1_local_network":
- port1_ip.network.network_address.exploded,
- "port1_prefix": port1_ip.network.prefixlen,
- "port0_local_ip6": self._get_port0localip6(),
- "port1_local_ip6": self._get_port1localip6(),
- "port0_prefixlen6": self._get_port0prefixlen6(),
- "port1_prefixlen6": self._get_port1prefixlen6(),
- "port0_gateway6": self._get_port0gateway6(),
- "port1_gateway6": self._get_port1gateway6(),
- "port0_dst_ip_hex6": self._get_port0localip6(),
- "port1_dst_ip_hex6": self._get_port1localip6(),
- "port0_dst_netmask_hex6": self._get_port0prefixlen6(),
- "port1_dst_netmask_hex6": self._get_port1prefixlen6(),
- "bin_path": self.bin_path,
- "socket": self.socket}
-
- for cfg in os.listdir(vnf_cfg):
- vpe_config = ""
- with open(os.path.join(vnf_cfg, cfg), 'r') as vpe_cfg:
- vpe_config = vpe_cfg.read()
-
- self._provide_config_file(cfg, vpe_config, vpe_vars)
-
- LOG.info("Provision and start the vPE")
- tool_path = provision_tool(self.connection,
- os.path.join(self.bin_path, "vPE_vnf"))
- cmd = VPE_PIPELINE_COMMAND.format(cfg_file="/tmp/vpe_config",
- script="/tmp/vpe_script",
- tool_path=tool_path)
- self.connection.run(cmd, stdin=filewrapper, stdout=filewrapper,
- keep_stdin_open=True, pty=True)
-
- def _provide_config_file(self, prefix, template, args):
- cfg, cfg_content = tempfile.mkstemp()
- cfg = os.fdopen(cfg, "w+")
- cfg.write(template.format(**args))
- cfg.close()
- cfg_file = "/tmp/%s" % prefix
- self.connection.put(cfg_content, cfg_file)
- return cfg_file
-
- def execute_command(self, cmd):
- ''' send cmd to vnf process '''
- LOG.info("VPE command: %s", cmd)
- output = []
- if self.q_in:
- self.q_in.put(cmd + "\r\n")
- time.sleep(3)
- while self.q_out.qsize() > 0:
- output.append(self.q_out.get())
- return "".join(output)
+ def get_stats(self, *args, **kwargs):
+ raise NotImplementedError
def collect_kpi(self):
- result = self.get_stats_vpe()
- collect_stats = self._collect_resource_kpi()
- result["collect_stats"] = collect_stats
- LOG.debug("vPE collet Kpis: %s", result)
- return result
-
- def get_stats_vpe(self):
- ''' get vpe statistics '''
- result = {'pkt_in_up_stream': 0, 'pkt_drop_up_stream': 0,
- 'pkt_in_down_stream': 0, 'pkt_drop_down_stream': 0}
- up_stat_commands = ['p 5 stats port in 0', 'p 5 stats port out 0',
- 'p 5 stats port out 1']
- down_stat_commands = ['p 9 stats port in 0', 'p 9 stats port out 0']
- pattern = \
- "Pkts in:\\s(\\d+)\\r\\n\\tPkts dropped by " \
- "AH:\\s(\\d+)\\r\\n\\tPkts dropped by other:\\s(\\d+)"
-
- for cmd in up_stat_commands:
- stats = self.execute_command(cmd)
- match = re.search(pattern, stats, re.MULTILINE)
- if match:
- result["pkt_in_up_stream"] = \
- result.get("pkt_in_up_stream", 0) + int(match.group(1))
- result["pkt_drop_up_stream"] = \
- result.get("pkt_drop_up_stream", 0) + \
- int(match.group(2)) + int(match.group(3))
-
- for cmd in down_stat_commands:
- stats = self.execute_command(cmd)
- match = re.search(pattern, stats, re.MULTILINE)
- if match:
- result["pkt_in_down_stream"] = \
- result.get("pkt_in_down_stream", 0) + int(match.group(1))
- result["pkt_drop_down_stream"] = \
- result.get("pkt_drop_down_stream", 0) + \
- int(match.group(2)) + int(match.group(3))
+ result = {
+ 'pkt_in_up_stream': 0,
+ 'pkt_drop_up_stream': 0,
+ 'pkt_in_down_stream': 0,
+ 'pkt_drop_down_stream': 0,
+ 'collect_stats': self.resource_helper.collect_kpi(),
+ }
+
+ indexes_in = [1]
+ indexes_drop = [2, 3]
+ command = 'p {0} stats port {1} 0'
+ for index, direction in ((5, 'up'), (9, 'down')):
+ key_in = "pkt_in_{0}_stream".format(direction)
+ key_drop = "pkt_drop_{0}_stream".format(direction)
+ for mode in ('in', 'out'):
+ stats = self.vnf_execute(command.format(index, mode))
+ match = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
+ if not match:
+ continue
+ result[key_in] += sum(int(match.group(x)) for x in indexes_in)
+ result[key_drop] += sum(int(match.group(x)) for x in indexes_drop)
+
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
return result
diff --git a/yardstick/network_services/vnf_generic/vnfdgen.py b/yardstick/network_services/vnf_generic/vnfdgen.py
index b56a91915..0120b493e 100644
--- a/yardstick/network_services/vnf_generic/vnfdgen.py
+++ b/yardstick/network_services/vnf_generic/vnfdgen.py
@@ -14,11 +14,16 @@
""" Generic file to map and build vnf discriptor """
from __future__ import absolute_import
-import collections
+from functools import reduce
import jinja2
+import logging
import yaml
+from yardstick.common.utils import try_int
+
+LOG = logging.getLogger(__name__)
+
def render(vnf_model, **kwargs):
"""Render jinja2 VNF template
@@ -40,7 +45,8 @@ def generate_vnfd(vnf_model, node):
as input for GenericVNF.__init__
"""
# get is unused as global method inside template
- node["get"] = get
+ # node["get"] = key_flatten_get
+ node["get"] = deepgetitem
# Set Node details to default if not defined in pod file
# we CANNOT use TaskTemplate.render because it does not allow
# for missing variables, we need to allow password for key_filename
@@ -52,36 +58,34 @@ def generate_vnfd(vnf_model, node):
return filled_vnfd
-def dict_key_flatten(data):
- """ Convert nested dict structure to dotted key
- (e.g. {"a":{"b":1}} -> {"a.b":1}
-
- :param data: nested dictionary
- :return: flat dicrionary
- """
- next_data = {}
-
- # check for non-string iterables
- if not any((isinstance(v, collections.Iterable) and not isinstance(v, str))
- for v in data.values()):
- return data
+# dict_flatten was causing recursion errors with Jinja2 so we removed and replaced
+# which this function from stackoverflow that doesn't require generating entire dictionaries
+# each time we query a key
+def deepgetitem(obj, item, default=None):
+ """Steps through an item chain to get the ultimate value.
- for key, val in data.items():
- if isinstance(val, collections.Mapping):
- for n_k, n_v in val.items():
- next_data["%s.%s" % (key, n_k)] = n_v
- elif isinstance(val, collections.Iterable) and not isinstance(val,
- str):
- for index, item in enumerate(val):
- next_data["%s%d" % (key, index)] = item
- else:
- next_data[key] = val
+ If ultimate value or path to value does not exist, does not raise
+ an exception and instead returns `fallback`.
- return dict_key_flatten(next_data)
+ Based on
+ https://stackoverflow.com/a/38623359
+ https://stackoverflow.com/users/1820042/donny-winston
+ add try_int to work with sequences
-def get(obj, key, *args):
- """ Get template key from dictionary, get default value or raise an exception
+ >>> d = {'snl_final': {'about': {'_icsd': {'icsd_id': 1, 'fr': [2, 3]}}}}
+ >>> deepgetitem(d, 'snl_final.about._icsd.icsd_id')
+ 1
+ >>> deepgetitem(d, 'snl_final.about._sandbox.sbx_id')
+ >>>
+ >>> deepgetitem(d, 'snl_final.about._icsd.fr.1')
+ 3
"""
- data = dict_key_flatten(obj)
- return data.get(key, *args)
+ def getitem(obj, name):
+ # if integer then list index
+ name = try_int(name)
+ try:
+ return obj[name]
+ except (KeyError, TypeError, IndexError):
+ return default
+ return reduce(getitem, item.split('.'), obj)