diff options
Diffstat (limited to 'yardstick/network_services/vnf_generic/vnf')
-rw-r--r-- | yardstick/network_services/vnf_generic/vnf/base.py | 234 | ||||
-rw-r--r-- | yardstick/network_services/vnf_generic/vnf/sample_vnf.py | 994 | ||||
-rw-r--r-- | yardstick/network_services/vnf_generic/vnf/tg_ping.py | 155 | ||||
-rw-r--r-- | yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py | 303 | ||||
-rw-r--r-- | yardstick/network_services/vnf_generic/vnf/tg_trex.py | 345 | ||||
-rw-r--r-- | yardstick/network_services/vnf_generic/vnf/vpe_vnf.py | 545 |
6 files changed, 1536 insertions, 1040 deletions
diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py index 2df6037f3..955f9f03d 100644 --- a/yardstick/network_services/vnf_generic/vnf/base.py +++ b/yardstick/network_services/vnf_generic/vnf/base.py @@ -15,10 +15,6 @@ from __future__ import absolute_import import logging -import ipaddress -import six - -from yardstick.network_services.utils import get_nsb_option LOG = logging.getLogger(__name__) @@ -61,192 +57,69 @@ class QueueFileWrapper(object): self.q_out.get() -class GenericVNF(object): +class VnfdHelper(dict): + + @property + def mgmt_interface(self): + return self["mgmt-interface"] + + @property + def vdu(self): + return self['vdu'] + + @property + def vdu0(self): + return self.vdu[0] + + @property + def interfaces(self): + return self.vdu0['external-interface'] + + @property + def kpi(self): + return self['benchmark']['kpi'] + + def find_virtual_interface(self, **kwargs): + key, value = next(iter(kwargs.items())) + for interface in self.interfaces: + virtual_intf = interface["virtual-interface"] + if virtual_intf[key] == value: + return interface + + def find_interface(self, **kwargs): + key, value = next(iter(kwargs.items())) + for interface in self.interfaces: + if interface[key] == value: + return interface + + +class VNFObject(object): + + def __init__(self, name, vnfd): + super(VNFObject, self).__init__() + self.name = name + self.vnfd_helper = VnfdHelper(vnfd) # fixme: parse this into a structure + + +class GenericVNF(VNFObject): + """ Class providing file-like API for generic VNF implementation """ - def __init__(self, vnfd): - super(GenericVNF, self).__init__() - self.vnfd = vnfd # fixme: parse this into a structure + def __init__(self, name, vnfd): + super(GenericVNF, self).__init__(name, vnfd) # List of statistics we can obtain from this VNF # - ETSI MANO 6.3.1.1 monitoring_parameter - self.kpi = self._get_kpi_definition(vnfd) + self.kpi = self._get_kpi_definition() # Standard dictionary containing params like thread no, buffer size etc self.config = {} self.runs_traffic = False - self.name = "vnf__1" # name in topology file - self.bin_path = get_nsb_option("bin_path", "") - @classmethod - def _get_kpi_definition(cls, vnfd): + def _get_kpi_definition(self): """ Get list of KPIs defined in VNFD :param vnfd: :return: list of KPIs, e.g. ['throughput', 'latency'] """ - return vnfd['benchmark']['kpi'] - - @classmethod - def get_ip_version(cls, ip_addr): - """ get ip address version v6 or v4 """ - try: - address = ipaddress.ip_address(six.text_type(ip_addr)) - except ValueError: - LOG.error(ip_addr, " is not valid") - return - else: - return address.version - - def _ip_to_hex(self, ip_addr): - ip_x = ip_addr - if self.get_ip_version(ip_addr) == 4: - ip_to_convert = ip_addr.split(".") - ip_octect = [int(octect) for octect in ip_to_convert] - ip_x = "{0[0]:02X}{0[1]:02X}{0[2]:02X}{0[3]:02X}".format(ip_octect) - return ip_x - - def _get_dpdk_port_num(self, name): - for intf in self.vnfd['vdu'][0]['external-interface']: - if name == intf['name']: - return intf['virtual-interface']['dpdk_port_num'] - - def _append_routes(self, ip_pipeline_cfg): - if 'routing_table' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['routing_table'] - - where = ip_pipeline_cfg.find("arp_route_tbl") - link = ip_pipeline_cfg[:where] - route_add = ip_pipeline_cfg[where:] - - tmp = route_add.find('\n') - route_add = route_add[tmp:] - - cmds = "arp_route_tbl =" - - for route in routing_table: - net = self._ip_to_hex(route['network']) - net_nm = self._ip_to_hex(route['netmask']) - net_gw = self._ip_to_hex(route['gateway']) - port = self._get_dpdk_port_num(route['if']) - cmd = \ - " ({port0_local_ip_hex},{port0_netmask_hex},{dpdk_port},"\ - "{port1_local_ip_hex})".format(port0_local_ip_hex=net, - port0_netmask_hex=net_nm, - dpdk_port=port, - port1_local_ip_hex=net_gw) - cmds += cmd - - cmds += '\n' - ip_pipeline_cfg = link + cmds + route_add - - return ip_pipeline_cfg - - def _append_nd_routes(self, ip_pipeline_cfg): - if 'nd_route_tbl' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['nd_route_tbl'] - - where = ip_pipeline_cfg.find("nd_route_tbl") - link = ip_pipeline_cfg[:where] - route_nd = ip_pipeline_cfg[where:] - - tmp = route_nd.find('\n') - route_nd = route_nd[tmp:] - - cmds = "nd_route_tbl =" - - for route in routing_table: - net = route['network'] - net_nm = route['netmask'] - net_gw = route['gateway'] - port = self._get_dpdk_port_num(route['if']) - cmd = \ - " ({port0_local_ip_hex},{port0_netmask_hex},{dpdk_port},"\ - "{port1_local_ip_hex})".format(port0_local_ip_hex=net, - port0_netmask_hex=net_nm, - dpdk_port=port, - port1_local_ip_hex=net_gw) - cmds += cmd - - cmds += '\n' - ip_pipeline_cfg = link + cmds + route_nd - - return ip_pipeline_cfg - - def _get_port0localip6(self): - return_value = "" - if 'nd_route_tbl' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['nd_route_tbl'] - - inc = 0 - for route in routing_table: - inc += 1 - if inc == 1: - return_value = route['network'] - LOG.info("_get_port0localip6 : %s", return_value) - return return_value - - def _get_port1localip6(self): - return_value = "" - if 'nd_route_tbl' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['nd_route_tbl'] - - inc = 0 - for route in routing_table: - inc += 1 - if inc == 2: - return_value = route['network'] - LOG.info("_get_port1localip6 : %s", return_value) - return return_value - - def _get_port0prefixlen6(self): - return_value = "" - if 'nd_route_tbl' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['nd_route_tbl'] - - inc = 0 - for route in routing_table: - inc += 1 - if inc == 1: - return_value = route['netmask'] - LOG.info("_get_port0prefixlen6 : %s", return_value) - return return_value - - def _get_port1prefixlen6(self): - return_value = "" - if 'nd_route_tbl' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['nd_route_tbl'] - - inc = 0 - for route in routing_table: - inc += 1 - if inc == 2: - return_value = route['netmask'] - LOG.info("_get_port1prefixlen6 : %s", return_value) - return return_value - - def _get_port0gateway6(self): - return_value = "" - if 'nd_route_tbl' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['nd_route_tbl'] - - inc = 0 - for route in routing_table: - inc += 1 - if inc == 1: - return_value = route['network'] - LOG.info("_get_port0gateway6 : %s", return_value) - return return_value - - def _get_port1gateway6(self): - return_value = "" - if 'nd_route_tbl' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['nd_route_tbl'] - - inc = 0 - for route in routing_table: - inc += 1 - if inc == 2: - return_value = route['network'] - LOG.info("_get_port1gateway6 : %s", return_value) - return return_value + return self.vnfd_helper.kpi def instantiate(self, scenario_cfg, context_cfg): """ Prepare VNF for operation and start the VNF process/VM @@ -284,11 +157,10 @@ class GenericVNF(object): class GenericTrafficGen(GenericVNF): """ Class providing file-like API for generic traffic generator """ - def __init__(self, vnfd): - super(GenericTrafficGen, self).__init__(vnfd) + def __init__(self, name, vnfd): + super(GenericTrafficGen, self).__init__(name, vnfd) self.runs_traffic = True self.traffic_finished = False - self.name = "tgen__1" # name in topology file def run_traffic(self, traffic_profile): """ Generate traffic on the wire according to the given params. diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py new file mode 100644 index 000000000..89c086d97 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -0,0 +1,994 @@ +# Copyright (c) 2016-2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" Base class implementation for generic vnf implementation """ + +from __future__ import absolute_import + +import posixpath +import time +import logging +import os +import re +import subprocess +from collections import Mapping + +from multiprocessing import Queue, Value, Process + +from six.moves import cStringIO + +from yardstick.benchmark.contexts.base import Context +from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file +from yardstick.network_services.helpers.cpu import CpuSysCores +from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig +from yardstick.network_services.nfvi.resource import ResourceProfile +from yardstick.network_services.vnf_generic.vnf.base import GenericVNF +from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper +from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen +from yardstick.network_services.utils import get_nsb_option + +from stl.trex_stl_lib.trex_stl_client import STLClient +from stl.trex_stl_lib.trex_stl_client import LoggerApi +from stl.trex_stl_lib.trex_stl_exceptions import STLError, STLStateError + +from yardstick.ssh import AutoConnectSSH + +DPDK_VERSION = "dpdk-16.07" + +LOG = logging.getLogger(__name__) + + +REMOTE_TMP = "/tmp" + + +class VnfSshHelper(AutoConnectSSH): + + def __init__(self, node, bin_path, wait=None): + self.node = node + kwargs = self.args_from_node(self.node) + if wait: + kwargs.setdefault('wait', wait) + + super(VnfSshHelper, self).__init__(**kwargs) + self.bin_path = bin_path + + @staticmethod + def get_class(): + # must return static class name, anything else refers to the calling class + # i.e. the subclass, not the superclass + return VnfSshHelper + + def copy(self): + # this copy constructor is different from SSH classes, since it uses node + return self.get_class()(self.node, self.bin_path) + + def upload_config_file(self, prefix, content): + cfg_file = os.path.join(REMOTE_TMP, prefix) + LOG.debug(content) + file_obj = cStringIO(content) + self.put_file_obj(file_obj, cfg_file) + return cfg_file + + def join_bin_path(self, *args): + return os.path.join(self.bin_path, *args) + + def provision_tool(self, tool_path=None, tool_file=None): + if tool_path is None: + tool_path = self.bin_path + return super(VnfSshHelper, self).provision_tool(tool_path, tool_file) + + +class SetupEnvHelper(object): + + CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config") + CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script") + CORES = [] + DEFAULT_CONFIG_TPL_CFG = "sample.cfg" + PIPELINE_COMMAND = '' + VNF_TYPE = "SAMPLE" + + def __init__(self, vnfd_helper, ssh_helper, scenario_helper): + super(SetupEnvHelper, self).__init__() + self.vnfd_helper = vnfd_helper + self.ssh_helper = ssh_helper + self.scenario_helper = scenario_helper + + def _get_ports_gateway(self, name): + routing_table = self.vnfd_helper.vdu0.get('routing_table', []) + for route in routing_table: + if name == route['if']: + return route['gateway'] + return None + + def build_config(self): + raise NotImplementedError + + def setup_vnf_environment(self): + pass + # raise NotImplementedError + + def tear_down(self): + raise NotImplementedError + + +class DpdkVnfSetupEnvHelper(SetupEnvHelper): + + APP_NAME = 'DpdkVnf' + DPDK_BIND_CMD = "sudo {dpdk_nic_bind} {force} -b {driver} {vpci}" + DPDK_UNBIND_CMD = "sudo {dpdk_nic_bind} --force -b {driver} {vpci}" + FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'" + + HW_DEFAULT_CORE = 3 + SW_DEFAULT_CORE = 2 + + DPDK_STATUS_DRIVER_RE = re.compile(r"(\d{2}:\d{2}\.\d).*drv=([-\w]+)") + + @staticmethod + def _update_packet_type(ip_pipeline_cfg, traffic_options): + match_str = 'pkt_type = ipv4' + replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type']) + pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str) + return pipeline_config_str + + @classmethod + def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options): + traffic_type = traffic_options['traffic_type'] + + if traffic_options['vnf_type'] is not cls.APP_NAME: + match_str = 'traffic_type = 4' + replace_str = 'traffic_type = {0}'.format(traffic_type) + + elif traffic_type == 4: + match_str = 'pkt_type = ipv4' + replace_str = 'pkt_type = ipv4' + + else: + match_str = 'pkt_type = ipv4' + replace_str = 'pkt_type = ipv6' + + pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str) + return pipeline_config_str + + def __init__(self, vnfd_helper, ssh_helper, scenario_helper): + super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper) + self.all_ports = None + self.bound_pci = None + self._dpdk_nic_bind = None + self.socket = None + + @property + def dpdk_nic_bind(self): + if self._dpdk_nic_bind is None: + self._dpdk_nic_bind = self.ssh_helper.provision_tool(tool_file="dpdk-devbind.py") + return self._dpdk_nic_bind + + def _setup_hugepages(self): + cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo" + hugepages = self.ssh_helper.execute(cmd)[1].rstrip() + + memory_path = \ + '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages + self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path) + + if hugepages == "2048kB": + pages = 16384 + else: + pages = 16 + + self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path)) + + def _get_dpdk_port_num(self, name): + interface = self.vnfd_helper.find_interface(name=name) + return interface['virtual-interface']['dpdk_port_num'] + + def build_config(self): + vnf_cfg = self.scenario_helper.vnf_cfg + task_path = self.scenario_helper.task_path + + lb_count = vnf_cfg.get('lb_count', 3) + lb_config = vnf_cfg.get('lb_config', 'SW') + worker_config = vnf_cfg.get('worker_config', '1C/1T') + worker_threads = vnf_cfg.get('worker_threads', 3) + + traffic_type = self.scenario_helper.all_options.get('traffic_type', 4) + traffic_options = { + 'traffic_type': traffic_type, + 'pkt_type': 'ipv%s' % traffic_type, + 'vnf_type': self.VNF_TYPE, + } + + config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path) + config_basename = posixpath.basename(self.CFG_CONFIG) + script_basename = posixpath.basename(self.CFG_SCRIPT) + multiport = MultiPortConfig(self.scenario_helper.topology, + config_tpl_cfg, + config_basename, + self.vnfd_helper.interfaces, + self.VNF_TYPE, + lb_count, + worker_threads, + worker_config, + lb_config, + self.socket) + + multiport.generate_config() + with open(self.CFG_CONFIG) as handle: + new_config = handle.read() + + new_config = self._update_traffic_type(new_config, traffic_options) + new_config = self._update_packet_type(new_config, traffic_options) + + self.ssh_helper.upload_config_file(config_basename, new_config) + self.ssh_helper.upload_config_file(script_basename, + multiport.generate_script(self.vnfd_helper)) + self.all_ports = multiport.port_pair_list + + LOG.info("Provision and start the %s", self.APP_NAME) + self._build_pipeline_kwargs() + return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs) + + def _build_pipeline_kwargs(self): + tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME) + ports_len_hex = hex(2 ** (len(self.all_ports) + 1) - 1) + self.pipeline_kwargs = { + 'cfg_file': self.CFG_CONFIG, + 'script': self.CFG_SCRIPT, + 'ports_len_hex': ports_len_hex, + 'tool_path': tool_path, + } + + def _get_app_cpu(self): + if self.CORES: + return self.CORES + + vnf_cfg = self.scenario_helper.vnf_cfg + sys_obj = CpuSysCores(self.ssh_helper) + self.sys_cpu = sys_obj.get_core_socket() + num_core = int(vnf_cfg["worker_threads"]) + if vnf_cfg.get("lb_config", "SW") == 'HW': + num_core += self.HW_DEFAULT_CORE + else: + num_core += self.SW_DEFAULT_CORE + app_cpu = self.sys_cpu[str(self.socket)][:num_core] + return app_cpu + + def _get_cpu_sibling_list(self, cores=None): + if cores is None: + cores = self._get_app_cpu() + sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list" + awk_template = "awk -F: '{ print $1 }' < %s" + sys_path = "/sys/devices/system/cpu/" + cpu_topology = [] + try: + for core in cores: + sys_cmd = sys_cmd_template % (sys_path, core) + cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1] + cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(',')) + + return cpu_topology + except Exception: + return [] + + def _validate_cpu_cfg(self): + return self._get_cpu_sibling_list() + + def _find_used_drivers(self): + cmd = "{0} -s".format(self.dpdk_nic_bind) + rc, dpdk_status, _ = self.ssh_helper.execute(cmd) + + self.used_drivers = { + vpci: (index, driver) + for index, (vpci, driver) + in enumerate(self.DPDK_STATUS_DRIVER_RE.findall(dpdk_status)) + if any(b.endswith(vpci) for b in self.bound_pci) + } + + def setup_vnf_environment(self): + self._setup_dpdk() + resource = self._setup_resources() + self._kill_vnf() + self._detect_drivers() + return resource + + def _kill_vnf(self): + self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME) + + def _setup_dpdk(self): + """ setup dpdk environment needed for vnf to run """ + + self._setup_hugepages() + self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio") + + exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0] + if exit_status == 0: + return + + dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION) + dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh") + exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0] + if exit_status != 0: + self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup) + + def _setup_resources(self): + interfaces = self.vnfd_helper.interfaces + self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces] + + # what is this magic? how do we know which socket is for which port? + # what about quad-socket? + if any(v[5] == "0" for v in self.bound_pci): + self.socket = 0 + else: + self.socket = 1 + + cores = self._validate_cpu_cfg() + return ResourceProfile(self.vnfd_helper, cores) + + def _detect_drivers(self): + interfaces = self.vnfd_helper.interfaces + + self._find_used_drivers() + for vpci, (index, _) in self.used_drivers.items(): + try: + intf1 = next(v for v in interfaces if vpci == v['virtual-interface']['vpci']) + except StopIteration: + pass + else: + intf1['dpdk_port_num'] = index + + for vpci in self.bound_pci: + self._bind_dpdk('igb_uio', vpci) + time.sleep(2) + + def _bind_dpdk(self, driver, vpci, force=True): + if force: + force = '--force ' + else: + force = '' + cmd = self.DPDK_BIND_CMD.format(force=force, + dpdk_nic_bind=self.dpdk_nic_bind, + driver=driver, + vpci=vpci) + self.ssh_helper.execute(cmd) + + def _detect_and_bind_dpdk(self, vpci, driver): + find_net_cmd = self.FIND_NET_CMD.format(vpci) + exit_status, _, _ = self.ssh_helper.execute(find_net_cmd) + if exit_status == 0: + # already bound + return None + self._bind_dpdk(driver, vpci) + exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd) + if exit_status != 0: + # failed to bind + return None + return stdout + + def _bind_kernel_devices(self): + for intf in self.vnfd_helper.interfaces: + vi = intf["virtual-interface"] + stdout = self._detect_and_bind_dpdk(vi["vpci"], vi["driver"]) + if stdout is not None: + vi["local_iface_name"] = posixpath.basename(stdout) + + def tear_down(self): + for vpci, (_, driver) in self.used_drivers.items(): + self.ssh_helper.execute(self.DPDK_UNBIND_CMD.format(dpdk_nic_bind=self.dpdk_nic_bind, + driver=driver, + vpci=vpci)) + + +class ResourceHelper(object): + + COLLECT_KPI = '' + MAKE_INSTALL = 'cd {0} && make && sudo make install' + RESOURCE_WORD = 'sample' + + COLLECT_MAP = {} + + def __init__(self, setup_helper): + super(ResourceHelper, self).__init__() + self.resource = None + self.setup_helper = setup_helper + self.ssh_helper = setup_helper.ssh_helper + + def setup(self): + self.resource = self.setup_helper.setup_vnf_environment() + + def generate_cfg(self): + pass + + def _collect_resource_kpi(self): + result = {} + status = self.resource.check_if_sa_running("collectd")[0] + if status: + result = self.resource.amqp_collect_nfvi_kpi() + + result = {"core": result} + return result + + def start_collect(self): + self.resource.initiate_systemagent(self.ssh_helper.bin_path) + self.resource.start() + self.resource.amqp_process_for_nfvi_kpi() + + def stop_collect(self): + if self.resource: + self.resource.stop() + + def collect_kpi(self): + return self._collect_resource_kpi() + + +class ClientResourceHelper(ResourceHelper): + + RUN_DURATION = 60 + QUEUE_WAIT_TIME = 5 + SYNC_PORT = 1 + ASYNC_PORT = 2 + + def __init__(self, setup_helper): + super(ClientResourceHelper, self).__init__(setup_helper) + self.vnfd_helper = setup_helper.vnfd_helper + self.scenario_helper = setup_helper.scenario_helper + + self.client = None + self.client_started = Value('i', 0) + self.my_ports = None + self._queue = Queue() + self._result = {} + self._terminated = Value('i', 0) + self._vpci_ascending = None + + def _build_ports(self): + self.my_ports = [0, 1] + + def get_stats(self, *args, **kwargs): + try: + return self.client.get_stats(*args, **kwargs) + except STLStateError: + LOG.exception("TRex client not connected") + return {} + + def generate_samples(self, key=None, default=None): + last_result = self.get_stats(self.my_ports) + key_value = last_result.get(key, default) + + if not isinstance(last_result, Mapping): # added for mock unit test + self._terminated.value = 1 + return {} + + samples = {} + for vpci_idx, vpci in enumerate(self._vpci_ascending): + name = self.vnfd_helper.find_virtual_interface(vpci=vpci)["name"] + # fixme: VNFDs KPIs values needs to be mapped to TRex structure + xe_value = last_result.get(vpci_idx, {}) + samples[name] = { + "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)), + "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)), + "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)), + "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)), + "in_packets": int(xe_value.get("ipackets", 0)), + "out_packets": int(xe_value.get("opackets", 0)), + } + if key: + samples[name][key] = key_value + return samples + + def _run_traffic_once(self, traffic_profile): + traffic_profile.execute(self) + self.client_started.value = 1 + time.sleep(self.RUN_DURATION) + samples = self.generate_samples() + time.sleep(self.QUEUE_WAIT_TIME) + self._queue.put(samples) + + def run_traffic(self, traffic_profile): + # fixme: fix passing correct trex config file, + # instead of searching the default path + self._build_ports() + self.client = self._connect() + self.client.reset(ports=self.my_ports) + self.client.remove_all_streams(self.my_ports) # remove all streams + traffic_profile.register_generator(self) + + while self._terminated.value == 0: + self._run_traffic_once(traffic_profile) + + self.client.stop(self.my_ports) + self.client.disconnect() + self._terminated.value = 0 + + def terminate(self): + self._terminated.value = 1 # stop client + + def clear_stats(self, ports=None): + if ports is None: + ports = self.my_ports + self.client.clear_stats(ports=ports) + + def start(self, ports=None, *args, **kwargs): + if ports is None: + ports = self.my_ports + self.client.start(ports=ports, *args, **kwargs) + + def collect_kpi(self): + if not self._queue.empty(): + kpi = self._queue.get() + self._result.update(kpi) + LOG.debug("Collect {0} KPIs {1}".format(self.RESOURCE_WORD, self._result)) + return self._result + + def _connect(self, client=None): + if client is None: + client = STLClient(username=self.vnfd_helper.mgmt_interface["user"], + server=self.vnfd_helper.mgmt_interface["ip"], + verbose_level=LoggerApi.VERBOSE_QUIET) + + # try to connect with 5s intervals, 30s max + for idx in range(6): + try: + client.connect() + break + except STLError: + LOG.info("Unable to connect to Trex Server.. Attempt %s", idx) + time.sleep(5) + return client + + +class Rfc2544ResourceHelper(object): + + DEFAULT_CORRELATED_TRAFFIC = False + DEFAULT_LATENCY = False + DEFAULT_TOLERANCE = '0.0001 - 0.0001' + + def __init__(self, scenario_helper): + super(Rfc2544ResourceHelper, self).__init__() + self.scenario_helper = scenario_helper + self._correlated_traffic = None + self.iteration = Value('i', 0) + self._latency = None + self._rfc2544 = None + self._tolerance_low = None + self._tolerance_high = None + + @property + def rfc2544(self): + if self._rfc2544 is None: + self._rfc2544 = self.scenario_helper.all_options['rfc2544'] + return self._rfc2544 + + @property + def tolerance_low(self): + if self._tolerance_low is None: + self.get_rfc_tolerance() + return self._tolerance_low + + @property + def tolerance_high(self): + if self._tolerance_high is None: + self.get_rfc_tolerance() + return self._tolerance_high + + @property + def correlated_traffic(self): + if self._correlated_traffic is None: + self._correlated_traffic = \ + self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC) + + return self._correlated_traffic + + @property + def latency(self): + if self._latency is None: + self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY) + return self._latency + + def get_rfc2544(self, name, default=None): + return self.rfc2544.get(name, default) + + def get_rfc_tolerance(self): + tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE) + tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-'))) + self._tolerance_low = next(tolerance_iter) + self._tolerance_high = next(tolerance_iter, self.tolerance_low) + + +class SampleVNFDeployHelper(object): + + SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf' + REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO) + SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME) + + def __init__(self, vnfd_helper, ssh_helper): + super(SampleVNFDeployHelper, self).__init__() + self.ssh_helper = ssh_helper + self.vnfd_helper = vnfd_helper + + DISABLE_DEPLOY = True + + def deploy_vnfs(self, app_name): + # temp disable for now + if self.DISABLE_DEPLOY: + return + + vnf_bin = self.ssh_helper.join_bin_path(app_name) + exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0] + if not exit_status: + return + + subprocess.check_output(["rm", "-rf", self.REPO_NAME]) + subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO]) + time.sleep(2) + self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR) + self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True) + + build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh') + time.sleep(2) + http_proxy = os.environ.get('http_proxy', '') + https_proxy = os.environ.get('https_proxy', '') + cmd = "sudo -E %s --silent '%s' '%s'" % (build_script, http_proxy, https_proxy) + LOG.debug(cmd) + self.ssh_helper.execute(cmd) + vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name) + self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path) + self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin)) + + +class ScenarioHelper(object): + + DEFAULT_VNF_CFG = { + 'lb_config': 'SW', + 'lb_count': 1, + 'worker_config': '1C/1T', + 'worker_threads': 1, + } + + def __init__(self, name): + self.name = name + self.scenario_cfg = None + + @property + def task_path(self): + return self.scenario_cfg["task_path"] + + @property + def nodes(self): + return self.scenario_cfg['nodes'] + + @property + def all_options(self): + return self.scenario_cfg["options"] + + @property + def options(self): + return self.all_options[self.name] + + @property + def vnf_cfg(self): + return self.options.get('vnf_config', self.DEFAULT_VNF_CFG) + + @property + def topology(self): + return self.scenario_cfg['topology'] + + +class SampleVNF(GenericVNF): + """ Class providing file-like API for generic VNF implementation """ + + VNF_PROMPT = "pipeline>" + WAIT_TIME = 1 + + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + super(SampleVNF, self).__init__(name, vnfd) + self.bin_path = get_nsb_option('bin_path', '') + + self.scenario_helper = ScenarioHelper(self.name) + self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path) + + if setup_env_helper_type is None: + setup_env_helper_type = SetupEnvHelper + + self.setup_helper = setup_env_helper_type(self.vnfd_helper, + self.ssh_helper, + self.scenario_helper) + + self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper) + + if resource_helper_type is None: + resource_helper_type = ResourceHelper + + self.resource_helper = resource_helper_type(self.setup_helper) + + self.all_ports = None + self.context_cfg = None + self.nfvi_context = None + self.pipeline_kwargs = {} + self.priv_ports = None + self.pub_ports = None + # TODO(esm): make QueueFileWrapper invert-able so that we + # never have to manage the queues + self.q_in = Queue() + self.q_out = Queue() + self.queue_wrapper = None + self.run_kwargs = {} + self.scenario_cfg = None + self.tg_port_pairs = None + self.used_drivers = {} + self.vnf_port_pairs = None + self._vnf_process = None + + def _get_route_data(self, route_index, route_type): + route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', [])) + for _ in range(route_index): + next(route_iter, '') + return next(route_iter, {}).get(route_type, '') + + def _get_port0localip6(self): + return_value = self._get_route_data(0, 'network') + LOG.info("_get_port0localip6 : %s", return_value) + return return_value + + def _get_port1localip6(self): + return_value = self._get_route_data(1, 'network') + LOG.info("_get_port1localip6 : %s", return_value) + return return_value + + def _get_port0prefixlen6(self): + return_value = self._get_route_data(0, 'netmask') + LOG.info("_get_port0prefixlen6 : %s", return_value) + return return_value + + def _get_port1prefixlen6(self): + return_value = self._get_route_data(1, 'netmask') + LOG.info("_get_port1prefixlen6 : %s", return_value) + return return_value + + def _get_port0gateway6(self): + return_value = self._get_route_data(0, 'network') + LOG.info("_get_port0gateway6 : %s", return_value) + return return_value + + def _get_port1gateway6(self): + return_value = self._get_route_data(1, 'network') + LOG.info("_get_port1gateway6 : %s", return_value) + return return_value + + def _start_vnf(self): + self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT) + self._vnf_process = Process(target=self._run) + self._vnf_process.start() + + def _vnf_up_post(self): + pass + + def instantiate(self, scenario_cfg, context_cfg): + self.scenario_helper.scenario_cfg = scenario_cfg + self.context_cfg = context_cfg + self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name]) + # self.nfvi_context = None + + self.deploy_helper.deploy_vnfs(self.APP_NAME) + self.resource_helper.setup() + self._start_vnf() + + def wait_for_instantiate(self): + buf = [] + time.sleep(self.WAIT_TIME) # Give some time for config to load + while True: + if not self._vnf_process.is_alive(): + raise RuntimeError("%s VNF process died." % self.APP_NAME) + + # TODO(esm): move to QueueFileWrapper + while self.q_out.qsize() > 0: + buf.append(self.q_out.get()) + message = ''.join(buf) + if self.VNF_PROMPT in message: + LOG.info("%s VNF is up and running.", self.APP_NAME) + self._vnf_up_post() + self.queue_wrapper.clear() + self.resource_helper.start_collect() + return self._vnf_process.exitcode + + if "PANIC" in message: + raise RuntimeError("Error starting %s VNF." % + self.APP_NAME) + + LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME) + time.sleep(1) + + def _build_run_kwargs(self): + self.run_kwargs = { + 'stdin': self.queue_wrapper, + 'stdout': self.queue_wrapper, + 'keep_stdin_open': True, + 'pty': True, + } + + def _build_config(self): + return self.setup_helper.build_config() + + def _run(self): + # we can't share ssh paramiko objects to force new connection + self.ssh_helper.drop_connection() + cmd = self._build_config() + # kill before starting + self.ssh_helper.execute("pkill {}".format(self.APP_NAME)) + + LOG.debug(cmd) + self._build_run_kwargs() + self.ssh_helper.run(cmd, **self.run_kwargs) + + def vnf_execute(self, cmd, wait_time=2): + """ send cmd to vnf process """ + + LOG.info("%s command: %s", self.APP_NAME, cmd) + self.q_in.put("{}\r\n".format(cmd)) + time.sleep(wait_time) + output = [] + while self.q_out.qsize() > 0: + output.append(self.q_out.get()) + return "".join(output) + + def _tear_down(self): + pass + + def terminate(self): + self.vnf_execute("quit") + if self._vnf_process: + self._vnf_process.terminate() + self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME) + self._tear_down() + self.resource_helper.stop_collect() + + def get_stats(self, *args, **kwargs): + """ + Method for checking the statistics + + :return: + VNF statistics + """ + cmd = 'p {0} stats'.format(self.APP_WORD) + out = self.vnf_execute(cmd) + return out + + def collect_kpi(self): + stats = self.get_stats() + m = re.search(self.COLLECT_KPI, stats, re.MULTILINE) + if m: + result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()} + result["collect_stats"] = self.resource_helper.collect_kpi() + else: + result = { + "packets_in": 0, + "packets_fwd": 0, + "packets_dropped": 0, + } + LOG.debug("%s collect KPIs %s", self.APP_NAME, result) + return result + + +class SampleVNFTrafficGen(GenericTrafficGen): + """ Class providing file-like API for generic traffic generator """ + + APP_NAME = 'Sample' + RUN_WAIT = 1 + + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + super(SampleVNFTrafficGen, self).__init__(name, vnfd) + self.bin_path = get_nsb_option('bin_path', '') + self.name = "tgen__1" # name in topology file + + self.scenario_helper = ScenarioHelper(self.name) + self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True) + + if setup_env_helper_type is None: + setup_env_helper_type = SetupEnvHelper + + self.setup_helper = setup_env_helper_type(self.vnfd_helper, + self.ssh_helper, + self.scenario_helper) + + if resource_helper_type is None: + resource_helper_type = ClientResourceHelper + + self.resource_helper = resource_helper_type(self.setup_helper) + + self.runs_traffic = True + self.traffic_finished = False + self.tg_port_pairs = None + self._tg_process = None + self._traffic_process = None + + def _start_server(self): + # we can't share ssh paramiko objects to force new connection + self.ssh_helper.drop_connection() + + def instantiate(self, scenario_cfg, context_cfg): + self.scenario_helper.scenario_cfg = scenario_cfg + self.resource_helper.generate_cfg() + self.setup_helper.setup_vnf_environment() + self.resource_helper.setup() + + LOG.info("Starting %s server...", self.APP_NAME) + self._tg_process = Process(target=self._start_server) + self._tg_process.start() + + def wait_for_instantiate(self): + return self._wait_for_process() + + def _check_status(self): + raise NotImplementedError + + def _wait_for_process(self): + while True: + if not self._tg_process.is_alive(): + raise RuntimeError("%s traffic generator process died." % self.APP_NAME) + LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME) + time.sleep(1) + status = self._check_status() + if status == 0: + LOG.info("%s TG Server is up and running.", self.APP_NAME) + return self._tg_process.exitcode + + def _traffic_runner(self, traffic_profile): + LOG.info("Starting %s client...", self.APP_NAME) + self.resource_helper.run_traffic(traffic_profile) + + def run_traffic(self, traffic_profile): + """ Generate traffic on the wire according to the given params. + Method is non-blocking, returns immediately when traffic process + is running. Mandatory. + + :param traffic_profile: + :return: True/False + """ + self._traffic_process = Process(target=self._traffic_runner, + args=(traffic_profile,)) + self._traffic_process.start() + # Wait for traffic process to start + while self.resource_helper.client_started.value == 0: + time.sleep(self.RUN_WAIT) + + return self._traffic_process.is_alive() + + def listen_traffic(self, traffic_profile): + """ Listen to traffic with the given parameters. + Method is non-blocking, returns immediately when traffic process + is running. Optional. + + :param traffic_profile: + :return: True/False + """ + pass + + def verify_traffic(self, traffic_profile): + """ Verify captured traffic after it has ended. Optional. + + :param traffic_profile: + :return: dict + """ + pass + + def collect_kpi(self): + result = self.resource_helper.collect_kpi() + LOG.debug("%s collect KPIs %s", self.APP_NAME, result) + return result + + def terminate(self): + """ After this method finishes, all traffic processes should stop. Mandatory. + + :return: True/False + """ + self.traffic_finished = True + if self._traffic_process is not None: + self._traffic_process.terminate() diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ping.py b/yardstick/network_services/vnf_generic/vnf/tg_ping.py index 000a91db4..e65296287 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_ping.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_ping.py @@ -16,14 +16,13 @@ from __future__ import absolute_import from __future__ import print_function import logging -import multiprocessing import re -import time -import os -from yardstick import ssh -from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen -from yardstick.network_services.utils import provision_tool +from multiprocessing import Queue +from ipaddress import IPv4Interface + +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen +from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper LOG = logging.getLogger(__name__) @@ -42,77 +41,59 @@ class PingParser(object): if match: # IMPORTANT: in order for the data to be properly taken # in by InfluxDB, it needs to be converted to numeric types - self.queue.put({"packets_received": float(match.group(1)), - "rtt": float(match.group(2))}) + self.queue.put({ + "packets_received": float(match.group(1)), + "rtt": float(match.group(2)), + }) def close(self): - ''' close the ssh connection ''' - pass + """ close the ssh connection """ + self.closed = True def clear(self): - ''' clear queue till Empty ''' + """ clear queue till Empty """ while self.queue.qsize() > 0: self.queue.get() -class PingTrafficGen(GenericTrafficGen): +class PingSetupEnvHelper(DpdkVnfSetupEnvHelper): + + def setup_vnf_environment(self): + self._bind_kernel_devices() + + +class PingTrafficGen(SampleVNFTrafficGen): """ This traffic generator can ping a single IP with pingsize and target given in traffic profile """ - def __init__(self, vnfd): - super(PingTrafficGen, self).__init__(vnfd) + TG_NAME = 'Ping' + RUN_WAIT = 4 + + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + if setup_env_helper_type is None: + setup_env_helper_type = PingSetupEnvHelper + + super(PingTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) + self._queue = Queue() + self._parser = PingParser(self._queue) self._result = {} - self._parser = None - self._queue = None - self._traffic_process = None - - mgmt_interface = vnfd["mgmt-interface"] - self.connection = ssh.SSH.from_node(mgmt_interface) - self.connection.wait() - - def _bind_device_kernel(self, connection): - dpdk_nic_bind = \ - provision_tool(self.connection, - os.path.join(self.bin_path, "dpdk_nic_bind.py")) - - drivers = {intf["virtual-interface"]["vpci"]: - intf["virtual-interface"]["driver"] - for intf in self.vnfd["vdu"][0]["external-interface"]} - - commands = \ - ['"{0}" --force -b "{1}" "{2}"'.format(dpdk_nic_bind, value, key) - for key, value in drivers.items()] - for command in commands: - connection.execute(command) - - for index, out in enumerate(self.vnfd["vdu"][0]["external-interface"]): - vpci = out["virtual-interface"]["vpci"] - net = "find /sys/class/net -lname '*{}*' -printf '%f'".format(vpci) - out = connection.execute(net)[1] - ifname = out.split('/')[-1].strip('\n') - self.vnfd["vdu"][0]["external-interface"][index][ - "virtual-interface"]["local_iface_name"] = ifname def scale(self, flavor=""): - ''' scale vnfbased on flavor input ''' - super(PingTrafficGen, self).scale(flavor) + """ scale vnf-based on flavor input """ + pass - def instantiate(self, scenario_cfg, context_cfg): - self._result = {"packets_received": 0, "rtt": 0} - self._bind_device_kernel(self.connection) + def _check_status(self): + return self._tg_process.is_alive() - def run_traffic(self, traffic_profile): - self._queue = multiprocessing.Queue() - self._parser = PingParser(self._queue) - self._traffic_process = \ - multiprocessing.Process(target=self._traffic_runner, - args=(traffic_profile, self._parser)) - self._traffic_process.start() - # Wait for traffic process to start - time.sleep(4) - return self._traffic_process.is_alive() + def instantiate(self, scenario_cfg, context_cfg): + self._result = { + "packets_received": 0, + "rtt": 0, + } + self.setup_helper.setup_vnf_environment() def listen_traffic(self, traffic_profile): """ Not needed for ping @@ -122,38 +103,26 @@ class PingTrafficGen(GenericTrafficGen): """ pass - def _traffic_runner(self, traffic_profile, filewrapper): - - mgmt_interface = self.vnfd["mgmt-interface"] - self.connection = ssh.SSH.from_node(mgmt_interface) - self.connection.wait() - external_interface = self.vnfd["vdu"][0]["external-interface"] - virtual_interface = external_interface[0]["virtual-interface"] - target_ip = virtual_interface["dst_ip"].split('/')[0] - local_ip = virtual_interface["local_ip"].split('/')[0] - local_if_name = \ - virtual_interface["local_iface_name"].split('/')[0] - packet_size = traffic_profile.params["traffic_profile"]["frame_size"] - - run_cmd = [] - - run_cmd.append("ip addr flush %s" % local_if_name) - run_cmd.append("ip addr add %s/24 dev %s" % (local_ip, local_if_name)) - run_cmd.append("ip link set %s up" % local_if_name) - - for cmd in run_cmd: - self.connection.execute(cmd) - - ping_cmd = ("ping -s %s %s" % (packet_size, target_ip)) - self.connection.run(ping_cmd, stdout=filewrapper, + def _traffic_runner(self, traffic_profile): + intf = self.vnfd_helper.interfaces[0]["virtual-interface"] + profile = traffic_profile.params["traffic_profile"] + cmd_kwargs = { + 'target_ip': IPv4Interface(intf["dst_ip"]).ip.exploded, + 'local_ip': IPv4Interface(intf["local_ip"]).ip.exploded, + 'local_if_name': intf["local_iface_name"].split('/')[0], + 'packet_size': profile["frame_size"], + } + + cmd_list = [ + "sudo ip addr flush {local_if_name}", + "sudo ip addr add {local_ip}/24 dev {local_if_name}", + "sudo ip link set {local_if_name} up", + ] + + for cmd in cmd_list: + self.ssh_helper.execute(cmd.format(**cmd_kwargs)) + + ping_cmd = "ping -s {packet_size} {target_ip}" + self.ssh_helper.run(ping_cmd.format(**cmd_kwargs), + stdout=self._parser, keep_stdin_open=True, pty=True) - - def collect_kpi(self): - if not self._queue.empty(): - kpi = self._queue.get() - self._result.update(kpi) - return self._result - - def terminate(self): - if self._traffic_process is not None: - self._traffic_process.terminate() diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py index 7da4b31e9..79e42e0a8 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py @@ -15,267 +15,98 @@ from __future__ import absolute_import from __future__ import print_function -import multiprocessing import time import logging -import os -import yaml +from collections import Mapping +from itertools import chain -from yardstick import ssh -from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen -from yardstick.network_services.utils import get_nsb_option -from stl.trex_stl_lib.trex_stl_client import STLClient -from stl.trex_stl_lib.trex_stl_client import LoggerApi -from stl.trex_stl_lib.trex_stl_exceptions import STLError +from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig +from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexTrafficGen +from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper +from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexResourceHelper LOGGING = logging.getLogger(__name__) -DURATION = 30 -WAIT_TIME = 3 -TREX_SYNC_PORT = 4500 -TREX_ASYNC_PORT = 4501 +class TrexRfc2544ResourceHelper(Rfc2544ResourceHelper): -class TrexTrafficGenRFC(GenericTrafficGen): - """ - This class handles mapping traffic profile and generating - traffic for rfc2544 testcase. - """ - - def __init__(self, vnfd): - super(TrexTrafficGenRFC, self).__init__(vnfd) - self._result = {} - self._terminated = multiprocessing.Value('i', 0) - self._queue = multiprocessing.Queue() - self._terminated = multiprocessing.Value('i', 0) - self._traffic_process = None - self._vpci_ascending = None - self.tc_file_name = None - self.client = None - self.my_ports = None - - mgmt_interface = self.vnfd["mgmt-interface"] - - self.connection = ssh.SSH.from_node(mgmt_interface) - self.connection.wait() - - @classmethod - def _split_mac_address_into_list(cls, mac): - octets = mac.split(':') - for i, elem in enumerate(octets): - octets[i] = "0x" + str(elem) - return octets - - def _generate_trex_cfg(self, vnfd): - """ - - :param vnfd: vnfd.yaml - :return: trex_cfg.yaml file - """ - trex_cfg = dict( - port_limit=0, - version='2', - interfaces=[], - port_info=list(dict( - )) - ) - trex_cfg["port_limit"] = len(vnfd["vdu"][0]["external-interface"]) - trex_cfg["version"] = '2' - - cfg_file = [] - vpci = [] - port = {} - - ext_intf = vnfd["vdu"][0]["external-interface"] - for interface in ext_intf: - virt_intf = interface["virtual-interface"] - vpci.append(virt_intf["vpci"]) - - port["src_mac"] = \ - self._split_mac_address_into_list(virt_intf["local_mac"]) - - time.sleep(WAIT_TIME) - port["dest_mac"] = \ - self._split_mac_address_into_list(virt_intf["dst_mac"]) - if virt_intf["dst_mac"]: - trex_cfg["port_info"].append(port.copy()) - - trex_cfg["interfaces"] = vpci - cfg_file.append(trex_cfg) - - with open('/tmp/trex_cfg.yaml', 'w') as outfile: - outfile.write(yaml.safe_dump(cfg_file, default_flow_style=False)) - self.connection.put('/tmp/trex_cfg.yaml', '/etc') - - self._vpci_ascending = sorted(vpci) - - def scale(self, flavor=""): - ''' scale vnfbased on flavor input ''' - super(TrexTrafficGenRFC, self).scale(flavor) - - def instantiate(self, scenario_cfg, context_cfg): - self._generate_trex_cfg(self.vnfd) - self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc']) - trex = os.path.join(self.bin_path, "trex") - err, _, _ = \ - self.connection.execute("ls {} >/dev/null 2>&1".format(trex)) - if err != 0: - self.connection.put(trex, trex, True) + def is_done(self): + return self.latency and self.iteration.value > 10 - LOGGING.debug("Starting TRex server...") - _tg_server = \ - multiprocessing.Process(target=self._start_server) - _tg_server.start() - while True: - LOGGING.info("Waiting for TG Server to start.. ") - time.sleep(WAIT_TIME) - status = \ - self.connection.execute("lsof -i:%s" % TREX_SYNC_PORT)[0] - if status == 0: - LOGGING.info("TG server is up and running.") - return _tg_server.exitcode - if not _tg_server.is_alive(): - raise RuntimeError("Traffic Generator process died.") +class TrexRfcResourceHelper(TrexResourceHelper): - def listen_traffic(self, traffic_profile): - pass + LATENCY_TIME_SLEEP = 120 + RUN_DURATION = 30 + WAIT_TIME = 3 - def _get_logical_if_name(self, vpci): - ext_intf = self.vnfd["vdu"][0]["external-interface"] - for interface in range(len(self.vnfd["vdu"][0]["external-interface"])): - virtual_intf = ext_intf[interface]["virtual-interface"] - if virtual_intf["vpci"] == vpci: - return ext_intf[interface]["name"] + def __init__(self, setup_helper, rfc_helper_type=None): + super(TrexRfcResourceHelper, self).__init__(setup_helper) - def run_traffic(self, traffic_profile, - client_started=multiprocessing.Value('i', 0)): + if rfc_helper_type is None: + rfc_helper_type = TrexRfc2544ResourceHelper - self._traffic_process = \ - multiprocessing.Process(target=self._traffic_runner, - args=(traffic_profile, self._queue, - client_started, self._terminated)) - self._traffic_process.start() - # Wait for traffic process to start - while client_started.value == 0: - time.sleep(1) + self.rfc2544_helper = rfc_helper_type(self.scenario_helper) + # self.tg_port_pairs = [] - return self._traffic_process.is_alive() + def _build_ports(self): + self.tg_port_pairs, self.networks = MultiPortConfig.get_port_pairs( + self.vnfd_helper.interfaces) + self.priv_ports = [int(x[0][-1]) for x in self.tg_port_pairs] + self.pub_ports = [int(x[1][-1]) for x in self.tg_port_pairs] + self.my_ports = list(set(chain(self.priv_ports, self.pub_ports))) - def _start_server(self): - mgmt_interface = self.vnfd["mgmt-interface"] - - _server = ssh.SSH.from_node(mgmt_interface) - _server.wait() - - _server.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" % - (TREX_SYNC_PORT, TREX_ASYNC_PORT)) - _server.execute("pkill -9 rex > /dev/null 2>&1") - - trex_path = os.path.join(self.bin_path, "trex/scripts") - path = get_nsb_option("trex_path", trex_path) - trex_cmd = "cd " + path + "; sudo ./t-rex-64 -i > /dev/null 2>&1" - - _server.execute(trex_cmd) - - def _connect_client(self, client=None): - if client is None: - client = STLClient(username=self.vnfd["mgmt-interface"]["user"], - server=self.vnfd["mgmt-interface"]["ip"], - verbose_level=LoggerApi.VERBOSE_QUIET) - for idx in range(6): - try: - client.connect() - break - except STLError: - LOGGING.info("Unable to connect to Trex. Attempt %s", idx) - time.sleep(WAIT_TIME) - return client - - @classmethod - def _get_rfc_tolerance(cls, tc_yaml): - tolerance = '0.8 - 1.0' - if 'tc_options' in tc_yaml['scenarios'][0]: - tc_options = tc_yaml['scenarios'][0]['tc_options'] - if 'rfc2544' in tc_options: - tolerance = \ - tc_options['rfc2544'].get('allowed_drop_rate', '0.8 - 1.0') - - tolerance = tolerance.split('-') - min_tol = float(tolerance[0]) - if len(tolerance) == 2: - max_tol = float(tolerance[1]) - else: - max_tol = float(tolerance[0]) - - return [min_tol, max_tol] - - def _traffic_runner(self, traffic_profile, queue, - client_started, terminated): - LOGGING.info("Starting TRex client...") - tc_yaml = {} - - with open(self.tc_file_name) as tc_file: - tc_yaml = yaml.load(tc_file.read()) + def _run_traffic_once(self, traffic_profile): + traffic_profile.execute(self) + self.client_started.value = 1 + time.sleep(self.RUN_DURATION) + self.client.stop(self.my_ports) + time.sleep(self.WAIT_TIME) + samples = traffic_profile.get_drop_percentage(self) + self._queue.put(samples) - tolerance = self._get_rfc_tolerance(tc_yaml) + if not self.rfc2544_helper.is_done(): + return - # fixme: fix passing correct trex config file, - # instead of searching the default path - self.my_ports = [0, 1] - self.client = self._connect_client() + self.client.stop(self.my_ports) self.client.reset(ports=self.my_ports) - self.client.remove_all_streams(self.my_ports) # remove all streams - while not terminated.value: - traffic_profile.execute(self) - client_started.value = 1 - time.sleep(DURATION) + self.client.remove_all_streams(self.my_ports) + traffic_profile.execute_latency(samples=samples) + multiplier = traffic_profile.calculate_pps(samples)[1] + for _ in range(5): + time.sleep(self.LATENCY_TIME_SLEEP) self.client.stop(self.my_ports) - time.sleep(WAIT_TIME) + time.sleep(self.WAIT_TIME) last_res = self.client.get_stats(self.my_ports) - samples = {} - for vpci_idx in range(len(self._vpci_ascending)): - name = \ - self._get_logical_if_name(self._vpci_ascending[vpci_idx]) - # fixme: VNFDs KPIs values needs to be mapped to TRex structure - if not isinstance(last_res, dict): - terminated.value = 1 - last_res = {} + if not isinstance(last_res, Mapping): + self._terminated.value = 1 + continue + self.generate_samples('latency', {}) + self._queue.put(samples) + self.client.start(mult=str(multiplier), + ports=self.my_ports, + duration=120, force=True) - samples[name] = \ - {"rx_throughput_fps": - float(last_res.get(vpci_idx, {}).get("rx_pps", 0.0)), - "tx_throughput_fps": - float(last_res.get(vpci_idx, {}).get("tx_pps", 0.0)), - "rx_throughput_mbps": - float(last_res.get(vpci_idx, {}).get("rx_bps", 0.0)), - "tx_throughput_mbps": - float(last_res.get(vpci_idx, {}).get("tx_bps", 0.0)), - "in_packets": - last_res.get(vpci_idx, {}).get("ipackets", 0), - "out_packets": - last_res.get(vpci_idx, {}).get("opackets", 0)} + def start_client(self, mult, duration, force=True): + self.client.start(ports=self.my_ports, mult=mult, duration=duration, force=force) - samples = \ - traffic_profile.get_drop_percentage(self, samples, - tolerance[0], tolerance[1]) - queue.put(samples) - self.client.stop(self.my_ports) - self.client.disconnect() - queue.put(samples) + def clear_client_stats(self): + self.client.clear_stats(ports=self.my_ports) def collect_kpi(self): - if not self._queue.empty(): - result = self._queue.get() - self._result.update(result) - LOGGING.debug("trex collect Kpis %s", self._result) - return self._result + self.rfc2544_helper.iteration.value += 1 + super(TrexRfcResourceHelper, self).collect_kpi() + - def terminate(self): - self._terminated.value = 1 # stop Trex clinet +class TrexTrafficGenRFC(TrexTrafficGen): + """ + This class handles mapping traffic profile and generating + traffic for rfc2544 testcase. + """ - self.connection.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" % - (TREX_SYNC_PORT, TREX_ASYNC_PORT)) + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + if resource_helper_type is None: + resource_helper_type = TrexRfcResourceHelper - if self._traffic_process: - self._traffic_process.terminate() + super(TrexTrafficGenRFC, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) diff --git a/yardstick/network_services/vnf_generic/vnf/tg_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_trex.py index 058b715fe..616b331ba 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_trex.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_trex.py @@ -15,261 +15,136 @@ from __future__ import absolute_import from __future__ import print_function -import multiprocessing -import time + import logging import os + import yaml -from yardstick import ssh -from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen +from yardstick.common.utils import mac_address_to_hex_list from yardstick.network_services.utils import get_nsb_option -from yardstick.network_services.utils import provision_tool -from stl.trex_stl_lib.trex_stl_client import STLClient -from stl.trex_stl_lib.trex_stl_client import LoggerApi -from stl.trex_stl_lib.trex_stl_exceptions import STLError +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen +from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper LOG = logging.getLogger(__name__) -DURATION = 30 -WAIT_QUEUE = 1 -TREX_SYNC_PORT = 4500 -TREX_ASYNC_PORT = 4501 -class TrexTrafficGen(GenericTrafficGen): +class TrexResourceHelper(ClientResourceHelper): + + CONF_FILE = '/tmp/trex_cfg.yaml' + QUEUE_WAIT_TIME = 1 + RESOURCE_WORD = 'trex' + RUN_DURATION = 0 + + SYNC_PORT = 4500 + ASYNC_PORT = 4501 + + def generate_cfg(self): + ext_intf = self.vnfd_helper.interfaces + vpci_list = [] + port_list = [] + trex_cfg = { + 'port_limit': 0, + 'version': '2', + 'interfaces': vpci_list, + 'port_info': port_list, + "port_limit": len(ext_intf), + "version": '2', + } + cfg_file = [trex_cfg] + + for interface in ext_intf: + virtual_interface = interface['virtual-interface'] + vpci_list.append(virtual_interface["vpci"]) + dst_mac = virtual_interface["dst_mac"] + + if not dst_mac: + continue + + local_mac = virtual_interface["local_mac"] + port_list.append({ + "src_mac": mac_address_to_hex_list(local_mac), + "dest_mac": mac_address_to_hex_list(dst_mac), + }) + + cfg_str = yaml.safe_dump(cfg_file, default_flow_style=False, explicit_start=True) + self.ssh_helper.upload_config_file(os.path.basename(self.CONF_FILE), cfg_str) + self._vpci_ascending = sorted(vpci_list) + + def check_status(self): + status, _, _ = self.ssh_helper.execute("sudo lsof -i:%s" % self.SYNC_PORT) + return status + + # temp disable + DISABLE_DEPLOY = True + + def setup(self): + if self.DISABLE_DEPLOY: + return + + trex_path = self.ssh_helper.join_bin_path('trex') + + err = self.ssh_helper.execute("which {}".format(trex_path))[0] + if err == 0: + return + + LOG.info("Copying %s to destination...", self.RESOURCE_WORD) + self.ssh_helper.run("sudo mkdir -p '{}'".format(os.path.dirname(trex_path))) + self.ssh_helper.put("~/.bash_profile", "~/.bash_profile") + self.ssh_helper.put(trex_path, trex_path, True) + ko_src = os.path.join(trex_path, "scripts/ko/src/") + self.ssh_helper.execute(self.MAKE_INSTALL.format(ko_src)) + + def start(self, ports=None, *args, **kwargs): + cmd = "sudo fuser -n tcp {0.SYNC_PORT} {0.ASYNC_PORT} -k > /dev/null 2>&1" + self.ssh_helper.execute(cmd.format(self)) + + self.ssh_helper.execute("sudo pkill -9 rex > /dev/null 2>&1") + + trex_path = self.ssh_helper.join_bin_path("trex", "scripts") + path = get_nsb_option("trex_path", trex_path) + + # cmd = "sudo ./t-rex-64 -i --cfg %s > /dev/null 2>&1" % self.CONF_FILE + cmd = "./t-rex-64 -i --cfg '{}'".format(self.CONF_FILE) + + # if there are errors we want to see them + # we have to sudo cd because the path might be owned by root + trex_cmd = """sudo bash -c "cd '{}' ; {}" >/dev/null""".format(path, cmd) + self.ssh_helper.execute(trex_cmd) + + def terminate(self): + super(TrexResourceHelper, self).terminate() + cmd = "sudo fuser -n tcp %s %s -k > /dev/null 2>&1" + self.ssh_helper.execute(cmd % (self.SYNC_PORT, self.ASYNC_PORT)) + + +class TrexTrafficGen(SampleVNFTrafficGen): """ This class handles mapping traffic profile and generating traffic for given testcase """ - def __init__(self, vnfd): - super(TrexTrafficGen, self).__init__(vnfd) - self._result = {} - self._queue = multiprocessing.Queue() - self._terminated = multiprocessing.Value('i', 0) - self._traffic_process = None - self._vpci_ascending = None - self.client = None - self.my_ports = None - self.client_started = multiprocessing.Value('i', 0) - - mgmt_interface = vnfd["mgmt-interface"] - - self.connection = ssh.SSH.from_node(mgmt_interface) - self.connection.wait() - - @classmethod - def _split_mac_address_into_list(cls, mac): - octets = mac.split(':') - for i, elem in enumerate(octets): - octets[i] = "0x" + str(elem) - return octets - - def _generate_trex_cfg(self, vnfd): - """ - - :param vnfd: vnfd.yaml - :return: trex_cfg.yaml file - """ - trex_cfg = dict( - port_limit=0, - version='2', - interfaces=[], - port_info=list(dict( - )) - ) - trex_cfg["port_limit"] = len(vnfd["vdu"][0]["external-interface"]) - trex_cfg["version"] = '2' - - cfg_file = [] - vpci = [] - port = {} - - for interface in range(len(vnfd["vdu"][0]["external-interface"])): - ext_intrf = vnfd["vdu"][0]["external-interface"] - virtual_interface = ext_intrf[interface]["virtual-interface"] - vpci.append(virtual_interface["vpci"]) - - port["src_mac"] = self._split_mac_address_into_list( - virtual_interface["local_mac"]) - port["dest_mac"] = self._split_mac_address_into_list( - virtual_interface["dst_mac"]) - - trex_cfg["port_info"].append(port.copy()) - - trex_cfg["interfaces"] = vpci - cfg_file.append(trex_cfg) - - with open('/tmp/trex_cfg.yaml', 'w') as outfile: - outfile.write(yaml.safe_dump(cfg_file, default_flow_style=False)) - self.connection.put('/tmp/trex_cfg.yaml', '/etc') - - self._vpci_ascending = sorted(vpci) - - @classmethod - def __setup_hugepages(cls, connection): - hugepages = \ - connection.execute( - "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1] - hugepages = hugepages.rstrip() - - memory_path = \ - '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages - connection.execute("awk -F: '{ print $1 }' < %s" % memory_path) - - pages = 16384 if hugepages.rstrip() == "2048kB" else 16 - connection.execute("echo %s > %s" % (pages, memory_path)) - - def setup_vnf_environment(self, connection): - ''' setup dpdk environment needed for vnf to run ''' - - self.__setup_hugepages(connection) - connection.execute("modprobe uio && modprobe igb_uio") - - exit_status = connection.execute("lsmod | grep -i igb_uio")[0] - if exit_status == 0: - return + APP_NAME = 'TRex' - dpdk = os.path.join(self.bin_path, "dpdk-16.07") - dpdk_setup = \ - provision_tool(self.connection, - os.path.join(self.bin_path, "nsb_setup.sh")) - status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0] - if status: - connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup) + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + if resource_helper_type is None: + resource_helper_type = TrexResourceHelper - def scale(self, flavor=""): - ''' scale vnfbased on flavor input ''' - super(TrexTrafficGen, self).scale(flavor) - - def instantiate(self, scenario_cfg, context_cfg): - self._generate_trex_cfg(self.vnfd) - self.setup_vnf_environment(self.connection) - - trex = os.path.join(self.bin_path, "trex") - err = \ - self.connection.execute("ls {} >/dev/null 2>&1".format(trex))[0] - if err != 0: - LOG.info("Copying trex to destination...") - self.connection.put("/root/.bash_profile", "/root/.bash_profile") - self.connection.put(trex, trex, True) - ko_src = os.path.join(trex, "scripts/ko/src/") - self.connection.execute("cd %s && make && make install" % ko_src) - - LOG.info("Starting TRex server...") - _tg_process = \ - multiprocessing.Process(target=self._start_server) - _tg_process.start() - while True: - if not _tg_process.is_alive(): - raise RuntimeError("Traffic Generator process died.") - LOG.info("Waiting for TG Server to start.. ") - time.sleep(1) - status = \ - self.connection.execute("lsof -i:%s" % TREX_SYNC_PORT)[0] - if status == 0: - LOG.info("TG server is up and running.") - return _tg_process.exitcode - - def listen_traffic(self, traffic_profile): - pass + super(TrexTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) - def _get_logical_if_name(self, vpci): - ext_intf = self.vnfd["vdu"][0]["external-interface"] - for interface in range(len(self.vnfd["vdu"][0]["external-interface"])): - virtual_intf = ext_intf[interface]["virtual-interface"] - if virtual_intf["vpci"] == vpci: - return ext_intf[interface]["name"] - - def run_traffic(self, traffic_profile): - self._traffic_process = \ - multiprocessing.Process(target=self._traffic_runner, - args=(traffic_profile, self._queue, - self.client_started, - self._terminated)) - self._traffic_process.start() - # Wait for traffic process to start - while self.client_started.value == 0: - time.sleep(1) - - return self._traffic_process.is_alive() + def _check_status(self): + return self.resource_helper.check_status() def _start_server(self): - mgmt_interface = self.vnfd["mgmt-interface"] - - _server = ssh.SSH.from_node(mgmt_interface) - _server.wait() + super(TrexTrafficGen, self)._start_server() + self.resource_helper.start() - _server.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" % - (TREX_SYNC_PORT, TREX_ASYNC_PORT)) + def scale(self, flavor=""): + pass - trex_path = os.path.join(self.bin_path, "trex/scripts") - path = get_nsb_option("trex_path", trex_path) - trex_cmd = "cd " + path + "; sudo ./t-rex-64 -i > /dev/null 2>&1" - - _server.execute(trex_cmd) - - def _connect_client(self, client=None): - if client is None: - client = STLClient(username=self.vnfd["mgmt-interface"]["user"], - server=self.vnfd["mgmt-interface"]["ip"], - verbose_level=LoggerApi.VERBOSE_QUIET) - # try to connect with 5s intervals, 30s max - for idx in range(6): - try: - client.connect() - break - except STLError: - LOG.info("Unable to connect to Trex Server.. Attempt %s", idx) - time.sleep(5) - return client - - def _traffic_runner(self, traffic_profile, queue, - client_started, terminated): - LOG.info("Starting TRex client...") - - self.my_ports = [0, 1] - self.client = self._connect_client() - self.client.reset(ports=self.my_ports) - - self.client.remove_all_streams(self.my_ports) # remove all streams - - while not terminated.value: - traffic_profile.execute(self) - client_started.value = 1 - last_res = self.client.get_stats(self.my_ports) - if not isinstance(last_res, dict): # added for mock unit test - terminated.value = 1 - last_res = {} - - samples = {} - for vpci_idx in range(len(self._vpci_ascending)): - name = \ - self._get_logical_if_name(self._vpci_ascending[vpci_idx]) - # fixme: VNFDs KPIs values needs to be mapped to TRex structure - xe_value = last_res.get(vpci_idx, {}) - samples[name] = \ - {"rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)), - "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)), - "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)), - "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)), - "in_packets": xe_value.get("ipackets", 0), - "out_packets": xe_value.get("opackets", 0)} - time.sleep(WAIT_QUEUE) - queue.put(samples) - - self.client.disconnect() - terminated.value = 0 - - def collect_kpi(self): - if not self._queue.empty(): - self._result.update(self._queue.get()) - LOG.debug("trex collect Kpis %s", self._result) - return self._result + def listen_traffic(self, traffic_profile): + pass def terminate(self): - self.connection.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" % - (TREX_SYNC_PORT, TREX_ASYNC_PORT)) - self.traffic_finished = True - if self._traffic_process: - self._traffic_process.terminate() + self.resource_helper.terminate() diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py index e9e80bdfb..310ab67cb 100644 --- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py @@ -15,313 +15,268 @@ from __future__ import absolute_import from __future__ import print_function -import tempfile -import time import os import logging import re -from multiprocessing import Queue -import multiprocessing -import ipaddress -import six +import posixpath -from yardstick import ssh -from yardstick.network_services.vnf_generic.vnf.base import GenericVNF -from yardstick.network_services.utils import provision_tool -from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper -from yardstick.network_services.nfvi.resource import ResourceProfile +from six.moves import configparser, zip -LOG = logging.getLogger(__name__) -VPE_PIPELINE_COMMAND = '{tool_path} -p 0x3 -f {cfg_file} -s {script}' -CORES = ['0', '1', '2'] -WAIT_TIME = 20 +from yardstick.network_services.pipeline import PipelineRules +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper +LOG = logging.getLogger(__name__) -class VpeApproxVnf(GenericVNF): +VPE_PIPELINE_COMMAND = """sudo {tool_path} -p {ports_len_hex} -f {cfg_file} -s {script}""" + +VPE_COLLECT_KPI = """\ +Pkts in:\s(\d+)\r\n\ +\tPkts dropped by Pkts in:\s(\d+)\r\n\ +\tPkts dropped by AH:\s(\d+)\r\n\\ +\tPkts dropped by other:\s(\d+)\ +""" + + +class ConfigCreate(object): + + @staticmethod + def vpe_tmq(config, index): + tm_q = 'TM{0}'.format(index) + config.add_section(tm_q) + config.set(tm_q, 'burst_read', '24') + config.set(tm_q, 'burst_write', '32') + config.set(tm_q, 'cfg', '/tmp/full_tm_profile_10G.cfg') + return config + + def __init__(self, priv_ports, pub_ports, socket): + super(ConfigCreate, self).__init__() + self.sw_q = -1 + self.sink_q = -1 + self.n_pipeline = 1 + self.priv_ports = priv_ports + self.pub_ports = pub_ports + self.pipeline_per_port = 9 + self.socket = socket + + def vpe_initialize(self, config): + config.add_section('EAL') + config.set('EAL', 'log_level', '0') + + config.add_section('PIPELINE0') + config.set('PIPELINE0', 'type', 'MASTER') + config.set('PIPELINE0', 'core', 's%sC0' % self.socket) + + config.add_section('MEMPOOL0') + config.set('MEMPOOL0', 'pool_size', '256K') + + config.add_section('MEMPOOL1') + config.set('MEMPOOL1', 'pool_size', '2M') + return config + + def vpe_rxq(self, config): + for port in self.pub_ports: + new_section = 'RXQ{0}.0'.format(port) + config.add_section(new_section) + config.set(new_section, 'mempool', 'MEMPOOL1') + + return config + + def get_sink_swq(self, parser, pipeline, k, index): + sink = "" + pktq = parser.get(pipeline, k) + if "SINK" in pktq: + self.sink_q += 1 + sink = " SINK{0}".format(self.sink_q) + if "TM" in pktq: + sink = " TM{0}".format(index) + pktq = "SWQ{0}{1}".format(self.sw_q, sink) + return pktq + + def vpe_upstream(self, vnf_cfg, intf): + parser = configparser.ConfigParser() + parser.read(os.path.join(vnf_cfg, 'vpe_upstream')) + for pipeline in parser.sections(): + for k, v in parser.items(pipeline): + if k == "pktq_in": + index = intf['index'] + if "RXQ" in v: + value = "RXQ{0}.0".format(index) + else: + value = self.get_sink_swq(parser, pipeline, k, index) + + parser.set(pipeline, k, value) + + elif k == "pktq_out": + index = intf['peer_intf']['index'] + if "TXQ" in v: + value = "TXQ{0}.0".format(index) + else: + self.sw_q += 1 + value = self.get_sink_swq(parser, pipeline, k, index) + + parser.set(pipeline, k, value) + + new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline) + if new_pipeline != pipeline: + parser._sections[new_pipeline] = parser._sections[pipeline] + parser._sections.pop(pipeline) + self.n_pipeline += 1 + return parser + + def vpe_downstream(self, vnf_cfg, intf): + parser = configparser.ConfigParser() + parser.read(os.path.join(vnf_cfg, 'vpe_downstream')) + for pipeline in parser.sections(): + for k, v in parser.items(pipeline): + index = intf['dpdk_port_num'] + peer_index = intf['peer_intf']['dpdk_port_num'] + + if k == "pktq_in": + if "RXQ" not in v: + value = self.get_sink_swq(parser, pipeline, k, index) + elif "TM" in v: + value = "RXQ{0}.0 TM{1}".format(peer_index, index) + else: + value = "RXQ{0}.0".format(peer_index) + + parser.set(pipeline, k, value) + + if k == "pktq_out": + if "TXQ" not in v: + self.sw_q += 1 + value = self.get_sink_swq(parser, pipeline, k, index) + elif "TM" in v: + value = "TXQ{0}.0 TM{1}".format(peer_index, index) + else: + value = "TXQ{0}.0".format(peer_index) + + parser.set(pipeline, k, value) + + new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline) + if new_pipeline != pipeline: + parser._sections[new_pipeline] = parser._sections[pipeline] + parser._sections.pop(pipeline) + self.n_pipeline += 1 + return parser + + def create_vpe_config(self, vnf_cfg): + config = configparser.ConfigParser() + vpe_cfg = os.path.join("/tmp/vpe_config") + with open(vpe_cfg, 'w') as cfg_file: + config = self.vpe_initialize(config) + config = self.vpe_rxq(config) + config.write(cfg_file) + for index, priv_port in enumerate(self.priv_ports): + config = self.vpe_upstream(vnf_cfg, priv_port) + config.write(cfg_file) + config = self.vpe_downstream(vnf_cfg, priv_port) + config = self.vpe_tmq(config, index) + config.write(cfg_file) + + def generate_vpe_script(self, interfaces): + rules = PipelineRules(pipeline_id=1) + for priv_port, pub_port in zip(self.priv_ports, self.pub_ports): + priv_intf = interfaces[priv_port]["virtual-interface"] + pub_intf = interfaces[pub_port]["virtual-interface"] + + dst_port0_ip = priv_intf["dst_ip"] + dst_port1_ip = pub_intf["dst_ip"] + dst_port0_mac = priv_intf["dst_mac"] + dst_port1_mac = pub_intf["dst_mac"] + + rules.add_firewall_script(dst_port0_ip) + rules.next_pipeline() + rules.add_flow_classification_script() + rules.next_pipeline() + rules.add_flow_action() + rules.next_pipeline() + rules.add_flow_action2() + rules.next_pipeline() + rules.add_route_script(dst_port1_ip, dst_port1_mac) + rules.next_pipeline() + rules.add_route_script2(dst_port0_ip, dst_port0_mac) + rules.next_pipeline(num=4) + + return rules.get_string() + + +class VpeApproxSetupEnvHelper(DpdkVnfSetupEnvHelper): + + CFG_CONFIG = "/tmp/vpe_config" + CFG_SCRIPT = "/tmp/vpe_script" + CORES = ['0', '1', '2', '3', '4', '5'] + PIPELINE_COMMAND = VPE_PIPELINE_COMMAND + + def build_config(self): + vpe_vars = { + "bin_path": self.ssh_helper.bin_path, + "socket": self.socket, + } + + all_ports = [] + priv_ports = [] + pub_ports = [] + for interface in self.vnfd_helper.interfaces: + all_ports.append(interface['name']) + vld_id = interface['virtual-interface']['vld_id'] + if vld_id.startswith('private'): + priv_ports.append(interface) + elif vld_id.startswith('public'): + pub_ports.append(interface) + + vpe_conf = ConfigCreate(priv_ports, pub_ports, self.socket) + vpe_conf.create_vpe_config(self.scenario_helper.vnf_cfg) + + config_basename = posixpath.basename(self.CFG_CONFIG) + script_basename = posixpath.basename(self.CFG_SCRIPT) + with open(self.CFG_CONFIG) as handle: + vpe_config = handle.read() + + self.ssh_helper.upload_config_file(config_basename, vpe_config.format(**vpe_vars)) + + vpe_script = vpe_conf.generate_vpe_script(self.vnfd_helper.interfaces) + self.ssh_helper.upload_config_file(script_basename, vpe_script.format(**vpe_vars)) + + +class VpeApproxVnf(SampleVNF): """ This class handles vPE VNF model-driver definitions """ - def __init__(self, vnfd): - super(VpeApproxVnf, self).__init__(vnfd) - self.socket = None - self.q_in = Queue() - self.q_out = Queue() - self.vnf_cfg = None - self._vnf_process = None - self.connection = None - self.resource = None - - def _resource_collect_start(self): - self.resource.initiate_systemagent(self.bin_path) - self.resource.start() + APP_NAME = 'vPE_vnf' + APP_WORD = 'vpe' + COLLECT_KPI = VPE_COLLECT_KPI + WAIT_TIME = 20 - def _resource_collect_stop(self): - self.resource.stop() + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + if setup_env_helper_type is None: + setup_env_helper_type = VpeApproxSetupEnvHelper - def _collect_resource_kpi(self): - result = {} + super(VpeApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) - status = self.resource.check_if_sa_running("collectd")[0] - if status: - result = self.resource.amqp_collect_nfvi_kpi() - - result = {"core": result} - - return result - - @classmethod - def __setup_hugepages(cls, connection): - hugepages = \ - connection.execute( - "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1] - hugepages = hugepages.rstrip() - - memory_path = \ - '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages - connection.execute("awk -F: '{ print $1 }' < %s" % memory_path) - - pages = 16384 if hugepages.rstrip() == "2048kB" else 16 - connection.execute("echo %s > %s" % (pages, memory_path)) - - def setup_vnf_environment(self, connection): - ''' setup dpdk environment needed for vnf to run ''' - - self.__setup_hugepages(connection) - connection.execute("modprobe uio && modprobe igb_uio") - - exit_status = connection.execute("lsmod | grep -i igb_uio")[0] - if exit_status == 0: - return - - dpdk = os.path.join(self.bin_path, "dpdk-16.07") - dpdk_setup = \ - provision_tool(self.connection, - os.path.join(self.bin_path, "nsb_setup.sh")) - status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0] - if status: - connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup) - - def _get_cpu_sibling_list(self): - cpu_topo = [] - for core in CORES: - sys_cmd = \ - "/sys/devices/system/cpu/cpu%s/topology/thread_siblings_list" \ - % core - cpuid = \ - self.connection.execute("awk -F: '{ print $1 }' < %s" % - sys_cmd)[1] - cpu_topo += \ - [(idx) if idx.isdigit() else idx for idx in cpuid.split(',')] - - return [cpu.strip() for cpu in cpu_topo] - - def scale(self, flavor=""): - ''' scale vnfbased on flavor input ''' - super(VpeApproxVnf, self).scale(flavor) - - def instantiate(self, scenario_cfg, context_cfg): - vnf_cfg = scenario_cfg['vnf_options']['vpe']['cfg'] - - mgmt_interface = self.vnfd["mgmt-interface"] - self.connection = ssh.SSH.from_node(mgmt_interface) - - self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc']) - - self.setup_vnf_environment(self.connection) - - cores = self._get_cpu_sibling_list() - self.resource = ResourceProfile(self.vnfd, cores) - - self.connection.execute("pkill vPE_vnf") - dpdk_nic_bind = \ - provision_tool(self.connection, - os.path.join(self.bin_path, "dpdk_nic_bind.py")) - - interfaces = self.vnfd["vdu"][0]['external-interface'] - self.socket = \ - next((0 for v in interfaces - if v['virtual-interface']["vpci"][5] == "0"), 1) - - bound_pci = [v['virtual-interface']["vpci"] for v in interfaces] - for vpci in bound_pci: - self.connection.execute( - "%s --force -b igb_uio %s" % (dpdk_nic_bind, vpci)) - queue_wrapper = \ - QueueFileWrapper(self.q_in, self.q_out, "pipeline>") - self._vnf_process = multiprocessing.Process(target=self._run_vpe, - args=(queue_wrapper, - vnf_cfg,)) - self._vnf_process.start() - buf = [] - time.sleep(WAIT_TIME) # Give some time for config to load - while True: - message = '' - while self.q_out.qsize() > 0: - buf.append(self.q_out.get()) - message = ''.join(buf) - if "pipeline>" in message: - LOG.info("VPE VNF is up and running.") - queue_wrapper.clear() - self._resource_collect_start() - return self._vnf_process.exitcode - if "PANIC" in message: - raise RuntimeError("Error starting vPE VNF.") - - LOG.info("Waiting for VNF to start.. ") - time.sleep(3) - if not self._vnf_process.is_alive(): - raise RuntimeError("vPE VNF process died.") - - def _get_ports_gateway(self, name): - if 'routing_table' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['routing_table'] - - for route in routing_table: - if name == route['if']: - return route['gateway'] - - def terminate(self): - self.execute_command("quit") - if self._vnf_process: - self._vnf_process.terminate() - - def _run_vpe(self, filewrapper, vnf_cfg): - mgmt_interface = self.vnfd["mgmt-interface"] - - self.connection = ssh.SSH.from_node(mgmt_interface) - self.connection.wait() - - interfaces = self.vnfd["vdu"][0]['external-interface'] - port0_ip = ipaddress.ip_interface(six.text_type( - "%s/%s" % (interfaces[0]["virtual-interface"]["local_ip"], - interfaces[0]["virtual-interface"]["netmask"]))) - port1_ip = ipaddress.ip_interface(six.text_type( - "%s/%s" % (interfaces[1]["virtual-interface"]["local_ip"], - interfaces[1]["virtual-interface"]["netmask"]))) - dst_port0_ip = ipaddress.ip_interface( - u"%s/%s" % (interfaces[0]["virtual-interface"]["dst_ip"], - interfaces[0]["virtual-interface"]["netmask"])) - dst_port1_ip = ipaddress.ip_interface( - u"%s/%s" % (interfaces[1]["virtual-interface"]["dst_ip"], - interfaces[1]["virtual-interface"]["netmask"])) - - vpe_vars = {"port0_local_ip": port0_ip.ip.exploded, - "port0_dst_ip": dst_port0_ip.ip.exploded, - "port0_local_ip_hex": - self._ip_to_hex(port0_ip.ip.exploded), - "port0_prefixlen": port0_ip.network.prefixlen, - "port0_netmask": port0_ip.network.netmask.exploded, - "port0_netmask_hex": - self._ip_to_hex(port0_ip.network.netmask.exploded), - "port0_local_mac": - interfaces[0]["virtual-interface"]["local_mac"], - "port0_dst_mac": - interfaces[0]["virtual-interface"]["dst_mac"], - "port0_gateway": - self._get_ports_gateway(interfaces[0]["name"]), - "port0_local_network": - port0_ip.network.network_address.exploded, - "port0_prefix": port0_ip.network.prefixlen, - "port1_local_ip": port1_ip.ip.exploded, - "port1_dst_ip": dst_port1_ip.ip.exploded, - "port1_local_ip_hex": - self._ip_to_hex(port1_ip.ip.exploded), - "port1_prefixlen": port1_ip.network.prefixlen, - "port1_netmask": port1_ip.network.netmask.exploded, - "port1_netmask_hex": - self._ip_to_hex(port1_ip.network.netmask.exploded), - "port1_local_mac": - interfaces[1]["virtual-interface"]["local_mac"], - "port1_dst_mac": - interfaces[1]["virtual-interface"]["dst_mac"], - "port1_gateway": - self._get_ports_gateway(interfaces[1]["name"]), - "port1_local_network": - port1_ip.network.network_address.exploded, - "port1_prefix": port1_ip.network.prefixlen, - "port0_local_ip6": self._get_port0localip6(), - "port1_local_ip6": self._get_port1localip6(), - "port0_prefixlen6": self._get_port0prefixlen6(), - "port1_prefixlen6": self._get_port1prefixlen6(), - "port0_gateway6": self._get_port0gateway6(), - "port1_gateway6": self._get_port1gateway6(), - "port0_dst_ip_hex6": self._get_port0localip6(), - "port1_dst_ip_hex6": self._get_port1localip6(), - "port0_dst_netmask_hex6": self._get_port0prefixlen6(), - "port1_dst_netmask_hex6": self._get_port1prefixlen6(), - "bin_path": self.bin_path, - "socket": self.socket} - - for cfg in os.listdir(vnf_cfg): - vpe_config = "" - with open(os.path.join(vnf_cfg, cfg), 'r') as vpe_cfg: - vpe_config = vpe_cfg.read() - - self._provide_config_file(cfg, vpe_config, vpe_vars) - - LOG.info("Provision and start the vPE") - tool_path = provision_tool(self.connection, - os.path.join(self.bin_path, "vPE_vnf")) - cmd = VPE_PIPELINE_COMMAND.format(cfg_file="/tmp/vpe_config", - script="/tmp/vpe_script", - tool_path=tool_path) - self.connection.run(cmd, stdin=filewrapper, stdout=filewrapper, - keep_stdin_open=True, pty=True) - - def _provide_config_file(self, prefix, template, args): - cfg, cfg_content = tempfile.mkstemp() - cfg = os.fdopen(cfg, "w+") - cfg.write(template.format(**args)) - cfg.close() - cfg_file = "/tmp/%s" % prefix - self.connection.put(cfg_content, cfg_file) - return cfg_file - - def execute_command(self, cmd): - ''' send cmd to vnf process ''' - LOG.info("VPE command: %s", cmd) - output = [] - if self.q_in: - self.q_in.put(cmd + "\r\n") - time.sleep(3) - while self.q_out.qsize() > 0: - output.append(self.q_out.get()) - return "".join(output) + def get_stats(self, *args, **kwargs): + raise NotImplementedError def collect_kpi(self): - result = self.get_stats_vpe() - collect_stats = self._collect_resource_kpi() - result["collect_stats"] = collect_stats - LOG.debug("vPE collet Kpis: %s", result) - return result - - def get_stats_vpe(self): - ''' get vpe statistics ''' - result = {'pkt_in_up_stream': 0, 'pkt_drop_up_stream': 0, - 'pkt_in_down_stream': 0, 'pkt_drop_down_stream': 0} - up_stat_commands = ['p 5 stats port in 0', 'p 5 stats port out 0', - 'p 5 stats port out 1'] - down_stat_commands = ['p 9 stats port in 0', 'p 9 stats port out 0'] - pattern = \ - "Pkts in:\\s(\\d+)\\r\\n\\tPkts dropped by " \ - "AH:\\s(\\d+)\\r\\n\\tPkts dropped by other:\\s(\\d+)" - - for cmd in up_stat_commands: - stats = self.execute_command(cmd) - match = re.search(pattern, stats, re.MULTILINE) - if match: - result["pkt_in_up_stream"] = \ - result.get("pkt_in_up_stream", 0) + int(match.group(1)) - result["pkt_drop_up_stream"] = \ - result.get("pkt_drop_up_stream", 0) + \ - int(match.group(2)) + int(match.group(3)) - - for cmd in down_stat_commands: - stats = self.execute_command(cmd) - match = re.search(pattern, stats, re.MULTILINE) - if match: - result["pkt_in_down_stream"] = \ - result.get("pkt_in_down_stream", 0) + int(match.group(1)) - result["pkt_drop_down_stream"] = \ - result.get("pkt_drop_down_stream", 0) + \ - int(match.group(2)) + int(match.group(3)) + result = { + 'pkt_in_up_stream': 0, + 'pkt_drop_up_stream': 0, + 'pkt_in_down_stream': 0, + 'pkt_drop_down_stream': 0, + 'collect_stats': self.resource_helper.collect_kpi(), + } + + indexes_in = [1] + indexes_drop = [2, 3] + command = 'p {0} stats port {1} 0' + for index, direction in ((5, 'up'), (9, 'down')): + key_in = "pkt_in_{0}_stream".format(direction) + key_drop = "pkt_drop_{0}_stream".format(direction) + for mode in ('in', 'out'): + stats = self.vnf_execute(command.format(index, mode)) + match = re.search(self.COLLECT_KPI, stats, re.MULTILINE) + if not match: + continue + result[key_in] += sum(int(match.group(x)) for x in indexes_in) + result[key_drop] += sum(int(match.group(x)) for x in indexes_drop) + + LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result |