aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
diff options
context:
space:
mode:
Diffstat (limited to 'yardstick/network_services/vnf_generic/vnf/sample_vnf.py')
-rw-r--r--yardstick/network_services/vnf_generic/vnf/sample_vnf.py994
1 files changed, 994 insertions, 0 deletions
diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
new file mode 100644
index 000000000..89c086d97
--- /dev/null
+++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
@@ -0,0 +1,994 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" Base class implementation for generic vnf implementation """
+
+from __future__ import absolute_import
+
+import posixpath
+import time
+import logging
+import os
+import re
+import subprocess
+from collections import Mapping
+
+from multiprocessing import Queue, Value, Process
+
+from six.moves import cStringIO
+
+from yardstick.benchmark.contexts.base import Context
+from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
+from yardstick.network_services.helpers.cpu import CpuSysCores
+from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
+from yardstick.network_services.nfvi.resource import ResourceProfile
+from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
+from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
+from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
+from yardstick.network_services.utils import get_nsb_option
+
+from stl.trex_stl_lib.trex_stl_client import STLClient
+from stl.trex_stl_lib.trex_stl_client import LoggerApi
+from stl.trex_stl_lib.trex_stl_exceptions import STLError, STLStateError
+
+from yardstick.ssh import AutoConnectSSH
+
+DPDK_VERSION = "dpdk-16.07"
+
+LOG = logging.getLogger(__name__)
+
+
+REMOTE_TMP = "/tmp"
+
+
+class VnfSshHelper(AutoConnectSSH):
+
+ def __init__(self, node, bin_path, wait=None):
+ self.node = node
+ kwargs = self.args_from_node(self.node)
+ if wait:
+ kwargs.setdefault('wait', wait)
+
+ super(VnfSshHelper, self).__init__(**kwargs)
+ self.bin_path = bin_path
+
+ @staticmethod
+ def get_class():
+ # must return static class name, anything else refers to the calling class
+ # i.e. the subclass, not the superclass
+ return VnfSshHelper
+
+ def copy(self):
+ # this copy constructor is different from SSH classes, since it uses node
+ return self.get_class()(self.node, self.bin_path)
+
+ def upload_config_file(self, prefix, content):
+ cfg_file = os.path.join(REMOTE_TMP, prefix)
+ LOG.debug(content)
+ file_obj = cStringIO(content)
+ self.put_file_obj(file_obj, cfg_file)
+ return cfg_file
+
+ def join_bin_path(self, *args):
+ return os.path.join(self.bin_path, *args)
+
+ def provision_tool(self, tool_path=None, tool_file=None):
+ if tool_path is None:
+ tool_path = self.bin_path
+ return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
+
+
+class SetupEnvHelper(object):
+
+ CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
+ CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
+ CORES = []
+ DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
+ PIPELINE_COMMAND = ''
+ VNF_TYPE = "SAMPLE"
+
+ def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
+ super(SetupEnvHelper, self).__init__()
+ self.vnfd_helper = vnfd_helper
+ self.ssh_helper = ssh_helper
+ self.scenario_helper = scenario_helper
+
+ def _get_ports_gateway(self, name):
+ routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
+ for route in routing_table:
+ if name == route['if']:
+ return route['gateway']
+ return None
+
+ def build_config(self):
+ raise NotImplementedError
+
+ def setup_vnf_environment(self):
+ pass
+ # raise NotImplementedError
+
+ def tear_down(self):
+ raise NotImplementedError
+
+
+class DpdkVnfSetupEnvHelper(SetupEnvHelper):
+
+ APP_NAME = 'DpdkVnf'
+ DPDK_BIND_CMD = "sudo {dpdk_nic_bind} {force} -b {driver} {vpci}"
+ DPDK_UNBIND_CMD = "sudo {dpdk_nic_bind} --force -b {driver} {vpci}"
+ FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
+
+ HW_DEFAULT_CORE = 3
+ SW_DEFAULT_CORE = 2
+
+ DPDK_STATUS_DRIVER_RE = re.compile(r"(\d{2}:\d{2}\.\d).*drv=([-\w]+)")
+
+ @staticmethod
+ def _update_packet_type(ip_pipeline_cfg, traffic_options):
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
+ pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
+ return pipeline_config_str
+
+ @classmethod
+ def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
+ traffic_type = traffic_options['traffic_type']
+
+ if traffic_options['vnf_type'] is not cls.APP_NAME:
+ match_str = 'traffic_type = 4'
+ replace_str = 'traffic_type = {0}'.format(traffic_type)
+
+ elif traffic_type == 4:
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = ipv4'
+
+ else:
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = ipv6'
+
+ pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
+ return pipeline_config_str
+
+ def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
+ super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
+ self.all_ports = None
+ self.bound_pci = None
+ self._dpdk_nic_bind = None
+ self.socket = None
+
+ @property
+ def dpdk_nic_bind(self):
+ if self._dpdk_nic_bind is None:
+ self._dpdk_nic_bind = self.ssh_helper.provision_tool(tool_file="dpdk-devbind.py")
+ return self._dpdk_nic_bind
+
+ def _setup_hugepages(self):
+ cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
+ hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
+
+ memory_path = \
+ '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
+ self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
+
+ if hugepages == "2048kB":
+ pages = 16384
+ else:
+ pages = 16
+
+ self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
+
+ def _get_dpdk_port_num(self, name):
+ interface = self.vnfd_helper.find_interface(name=name)
+ return interface['virtual-interface']['dpdk_port_num']
+
+ def build_config(self):
+ vnf_cfg = self.scenario_helper.vnf_cfg
+ task_path = self.scenario_helper.task_path
+
+ lb_count = vnf_cfg.get('lb_count', 3)
+ lb_config = vnf_cfg.get('lb_config', 'SW')
+ worker_config = vnf_cfg.get('worker_config', '1C/1T')
+ worker_threads = vnf_cfg.get('worker_threads', 3)
+
+ traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
+ traffic_options = {
+ 'traffic_type': traffic_type,
+ 'pkt_type': 'ipv%s' % traffic_type,
+ 'vnf_type': self.VNF_TYPE,
+ }
+
+ config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
+ config_basename = posixpath.basename(self.CFG_CONFIG)
+ script_basename = posixpath.basename(self.CFG_SCRIPT)
+ multiport = MultiPortConfig(self.scenario_helper.topology,
+ config_tpl_cfg,
+ config_basename,
+ self.vnfd_helper.interfaces,
+ self.VNF_TYPE,
+ lb_count,
+ worker_threads,
+ worker_config,
+ lb_config,
+ self.socket)
+
+ multiport.generate_config()
+ with open(self.CFG_CONFIG) as handle:
+ new_config = handle.read()
+
+ new_config = self._update_traffic_type(new_config, traffic_options)
+ new_config = self._update_packet_type(new_config, traffic_options)
+
+ self.ssh_helper.upload_config_file(config_basename, new_config)
+ self.ssh_helper.upload_config_file(script_basename,
+ multiport.generate_script(self.vnfd_helper))
+ self.all_ports = multiport.port_pair_list
+
+ LOG.info("Provision and start the %s", self.APP_NAME)
+ self._build_pipeline_kwargs()
+ return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
+
+ def _build_pipeline_kwargs(self):
+ tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
+ ports_len_hex = hex(2 ** (len(self.all_ports) + 1) - 1)
+ self.pipeline_kwargs = {
+ 'cfg_file': self.CFG_CONFIG,
+ 'script': self.CFG_SCRIPT,
+ 'ports_len_hex': ports_len_hex,
+ 'tool_path': tool_path,
+ }
+
+ def _get_app_cpu(self):
+ if self.CORES:
+ return self.CORES
+
+ vnf_cfg = self.scenario_helper.vnf_cfg
+ sys_obj = CpuSysCores(self.ssh_helper)
+ self.sys_cpu = sys_obj.get_core_socket()
+ num_core = int(vnf_cfg["worker_threads"])
+ if vnf_cfg.get("lb_config", "SW") == 'HW':
+ num_core += self.HW_DEFAULT_CORE
+ else:
+ num_core += self.SW_DEFAULT_CORE
+ app_cpu = self.sys_cpu[str(self.socket)][:num_core]
+ return app_cpu
+
+ def _get_cpu_sibling_list(self, cores=None):
+ if cores is None:
+ cores = self._get_app_cpu()
+ sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
+ awk_template = "awk -F: '{ print $1 }' < %s"
+ sys_path = "/sys/devices/system/cpu/"
+ cpu_topology = []
+ try:
+ for core in cores:
+ sys_cmd = sys_cmd_template % (sys_path, core)
+ cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
+ cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
+
+ return cpu_topology
+ except Exception:
+ return []
+
+ def _validate_cpu_cfg(self):
+ return self._get_cpu_sibling_list()
+
+ def _find_used_drivers(self):
+ cmd = "{0} -s".format(self.dpdk_nic_bind)
+ rc, dpdk_status, _ = self.ssh_helper.execute(cmd)
+
+ self.used_drivers = {
+ vpci: (index, driver)
+ for index, (vpci, driver)
+ in enumerate(self.DPDK_STATUS_DRIVER_RE.findall(dpdk_status))
+ if any(b.endswith(vpci) for b in self.bound_pci)
+ }
+
+ def setup_vnf_environment(self):
+ self._setup_dpdk()
+ resource = self._setup_resources()
+ self._kill_vnf()
+ self._detect_drivers()
+ return resource
+
+ def _kill_vnf(self):
+ self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
+
+ def _setup_dpdk(self):
+ """ setup dpdk environment needed for vnf to run """
+
+ self._setup_hugepages()
+ self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
+
+ exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
+ if exit_status == 0:
+ return
+
+ dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
+ dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
+ exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
+ if exit_status != 0:
+ self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
+
+ def _setup_resources(self):
+ interfaces = self.vnfd_helper.interfaces
+ self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
+
+ # what is this magic? how do we know which socket is for which port?
+ # what about quad-socket?
+ if any(v[5] == "0" for v in self.bound_pci):
+ self.socket = 0
+ else:
+ self.socket = 1
+
+ cores = self._validate_cpu_cfg()
+ return ResourceProfile(self.vnfd_helper, cores)
+
+ def _detect_drivers(self):
+ interfaces = self.vnfd_helper.interfaces
+
+ self._find_used_drivers()
+ for vpci, (index, _) in self.used_drivers.items():
+ try:
+ intf1 = next(v for v in interfaces if vpci == v['virtual-interface']['vpci'])
+ except StopIteration:
+ pass
+ else:
+ intf1['dpdk_port_num'] = index
+
+ for vpci in self.bound_pci:
+ self._bind_dpdk('igb_uio', vpci)
+ time.sleep(2)
+
+ def _bind_dpdk(self, driver, vpci, force=True):
+ if force:
+ force = '--force '
+ else:
+ force = ''
+ cmd = self.DPDK_BIND_CMD.format(force=force,
+ dpdk_nic_bind=self.dpdk_nic_bind,
+ driver=driver,
+ vpci=vpci)
+ self.ssh_helper.execute(cmd)
+
+ def _detect_and_bind_dpdk(self, vpci, driver):
+ find_net_cmd = self.FIND_NET_CMD.format(vpci)
+ exit_status, _, _ = self.ssh_helper.execute(find_net_cmd)
+ if exit_status == 0:
+ # already bound
+ return None
+ self._bind_dpdk(driver, vpci)
+ exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
+ if exit_status != 0:
+ # failed to bind
+ return None
+ return stdout
+
+ def _bind_kernel_devices(self):
+ for intf in self.vnfd_helper.interfaces:
+ vi = intf["virtual-interface"]
+ stdout = self._detect_and_bind_dpdk(vi["vpci"], vi["driver"])
+ if stdout is not None:
+ vi["local_iface_name"] = posixpath.basename(stdout)
+
+ def tear_down(self):
+ for vpci, (_, driver) in self.used_drivers.items():
+ self.ssh_helper.execute(self.DPDK_UNBIND_CMD.format(dpdk_nic_bind=self.dpdk_nic_bind,
+ driver=driver,
+ vpci=vpci))
+
+
+class ResourceHelper(object):
+
+ COLLECT_KPI = ''
+ MAKE_INSTALL = 'cd {0} && make && sudo make install'
+ RESOURCE_WORD = 'sample'
+
+ COLLECT_MAP = {}
+
+ def __init__(self, setup_helper):
+ super(ResourceHelper, self).__init__()
+ self.resource = None
+ self.setup_helper = setup_helper
+ self.ssh_helper = setup_helper.ssh_helper
+
+ def setup(self):
+ self.resource = self.setup_helper.setup_vnf_environment()
+
+ def generate_cfg(self):
+ pass
+
+ def _collect_resource_kpi(self):
+ result = {}
+ status = self.resource.check_if_sa_running("collectd")[0]
+ if status:
+ result = self.resource.amqp_collect_nfvi_kpi()
+
+ result = {"core": result}
+ return result
+
+ def start_collect(self):
+ self.resource.initiate_systemagent(self.ssh_helper.bin_path)
+ self.resource.start()
+ self.resource.amqp_process_for_nfvi_kpi()
+
+ def stop_collect(self):
+ if self.resource:
+ self.resource.stop()
+
+ def collect_kpi(self):
+ return self._collect_resource_kpi()
+
+
+class ClientResourceHelper(ResourceHelper):
+
+ RUN_DURATION = 60
+ QUEUE_WAIT_TIME = 5
+ SYNC_PORT = 1
+ ASYNC_PORT = 2
+
+ def __init__(self, setup_helper):
+ super(ClientResourceHelper, self).__init__(setup_helper)
+ self.vnfd_helper = setup_helper.vnfd_helper
+ self.scenario_helper = setup_helper.scenario_helper
+
+ self.client = None
+ self.client_started = Value('i', 0)
+ self.my_ports = None
+ self._queue = Queue()
+ self._result = {}
+ self._terminated = Value('i', 0)
+ self._vpci_ascending = None
+
+ def _build_ports(self):
+ self.my_ports = [0, 1]
+
+ def get_stats(self, *args, **kwargs):
+ try:
+ return self.client.get_stats(*args, **kwargs)
+ except STLStateError:
+ LOG.exception("TRex client not connected")
+ return {}
+
+ def generate_samples(self, key=None, default=None):
+ last_result = self.get_stats(self.my_ports)
+ key_value = last_result.get(key, default)
+
+ if not isinstance(last_result, Mapping): # added for mock unit test
+ self._terminated.value = 1
+ return {}
+
+ samples = {}
+ for vpci_idx, vpci in enumerate(self._vpci_ascending):
+ name = self.vnfd_helper.find_virtual_interface(vpci=vpci)["name"]
+ # fixme: VNFDs KPIs values needs to be mapped to TRex structure
+ xe_value = last_result.get(vpci_idx, {})
+ samples[name] = {
+ "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
+ "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
+ "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
+ "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
+ "in_packets": int(xe_value.get("ipackets", 0)),
+ "out_packets": int(xe_value.get("opackets", 0)),
+ }
+ if key:
+ samples[name][key] = key_value
+ return samples
+
+ def _run_traffic_once(self, traffic_profile):
+ traffic_profile.execute(self)
+ self.client_started.value = 1
+ time.sleep(self.RUN_DURATION)
+ samples = self.generate_samples()
+ time.sleep(self.QUEUE_WAIT_TIME)
+ self._queue.put(samples)
+
+ def run_traffic(self, traffic_profile):
+ # fixme: fix passing correct trex config file,
+ # instead of searching the default path
+ self._build_ports()
+ self.client = self._connect()
+ self.client.reset(ports=self.my_ports)
+ self.client.remove_all_streams(self.my_ports) # remove all streams
+ traffic_profile.register_generator(self)
+
+ while self._terminated.value == 0:
+ self._run_traffic_once(traffic_profile)
+
+ self.client.stop(self.my_ports)
+ self.client.disconnect()
+ self._terminated.value = 0
+
+ def terminate(self):
+ self._terminated.value = 1 # stop client
+
+ def clear_stats(self, ports=None):
+ if ports is None:
+ ports = self.my_ports
+ self.client.clear_stats(ports=ports)
+
+ def start(self, ports=None, *args, **kwargs):
+ if ports is None:
+ ports = self.my_ports
+ self.client.start(ports=ports, *args, **kwargs)
+
+ def collect_kpi(self):
+ if not self._queue.empty():
+ kpi = self._queue.get()
+ self._result.update(kpi)
+ LOG.debug("Collect {0} KPIs {1}".format(self.RESOURCE_WORD, self._result))
+ return self._result
+
+ def _connect(self, client=None):
+ if client is None:
+ client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
+ server=self.vnfd_helper.mgmt_interface["ip"],
+ verbose_level=LoggerApi.VERBOSE_QUIET)
+
+ # try to connect with 5s intervals, 30s max
+ for idx in range(6):
+ try:
+ client.connect()
+ break
+ except STLError:
+ LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
+ time.sleep(5)
+ return client
+
+
+class Rfc2544ResourceHelper(object):
+
+ DEFAULT_CORRELATED_TRAFFIC = False
+ DEFAULT_LATENCY = False
+ DEFAULT_TOLERANCE = '0.0001 - 0.0001'
+
+ def __init__(self, scenario_helper):
+ super(Rfc2544ResourceHelper, self).__init__()
+ self.scenario_helper = scenario_helper
+ self._correlated_traffic = None
+ self.iteration = Value('i', 0)
+ self._latency = None
+ self._rfc2544 = None
+ self._tolerance_low = None
+ self._tolerance_high = None
+
+ @property
+ def rfc2544(self):
+ if self._rfc2544 is None:
+ self._rfc2544 = self.scenario_helper.all_options['rfc2544']
+ return self._rfc2544
+
+ @property
+ def tolerance_low(self):
+ if self._tolerance_low is None:
+ self.get_rfc_tolerance()
+ return self._tolerance_low
+
+ @property
+ def tolerance_high(self):
+ if self._tolerance_high is None:
+ self.get_rfc_tolerance()
+ return self._tolerance_high
+
+ @property
+ def correlated_traffic(self):
+ if self._correlated_traffic is None:
+ self._correlated_traffic = \
+ self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
+
+ return self._correlated_traffic
+
+ @property
+ def latency(self):
+ if self._latency is None:
+ self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
+ return self._latency
+
+ def get_rfc2544(self, name, default=None):
+ return self.rfc2544.get(name, default)
+
+ def get_rfc_tolerance(self):
+ tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
+ tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
+ self._tolerance_low = next(tolerance_iter)
+ self._tolerance_high = next(tolerance_iter, self.tolerance_low)
+
+
+class SampleVNFDeployHelper(object):
+
+ SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
+ REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
+ SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
+
+ def __init__(self, vnfd_helper, ssh_helper):
+ super(SampleVNFDeployHelper, self).__init__()
+ self.ssh_helper = ssh_helper
+ self.vnfd_helper = vnfd_helper
+
+ DISABLE_DEPLOY = True
+
+ def deploy_vnfs(self, app_name):
+ # temp disable for now
+ if self.DISABLE_DEPLOY:
+ return
+
+ vnf_bin = self.ssh_helper.join_bin_path(app_name)
+ exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
+ if not exit_status:
+ return
+
+ subprocess.check_output(["rm", "-rf", self.REPO_NAME])
+ subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
+ time.sleep(2)
+ self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
+ self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
+
+ build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
+ time.sleep(2)
+ http_proxy = os.environ.get('http_proxy', '')
+ https_proxy = os.environ.get('https_proxy', '')
+ cmd = "sudo -E %s --silent '%s' '%s'" % (build_script, http_proxy, https_proxy)
+ LOG.debug(cmd)
+ self.ssh_helper.execute(cmd)
+ vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
+ self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
+ self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
+
+
+class ScenarioHelper(object):
+
+ DEFAULT_VNF_CFG = {
+ 'lb_config': 'SW',
+ 'lb_count': 1,
+ 'worker_config': '1C/1T',
+ 'worker_threads': 1,
+ }
+
+ def __init__(self, name):
+ self.name = name
+ self.scenario_cfg = None
+
+ @property
+ def task_path(self):
+ return self.scenario_cfg["task_path"]
+
+ @property
+ def nodes(self):
+ return self.scenario_cfg['nodes']
+
+ @property
+ def all_options(self):
+ return self.scenario_cfg["options"]
+
+ @property
+ def options(self):
+ return self.all_options[self.name]
+
+ @property
+ def vnf_cfg(self):
+ return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
+
+ @property
+ def topology(self):
+ return self.scenario_cfg['topology']
+
+
+class SampleVNF(GenericVNF):
+ """ Class providing file-like API for generic VNF implementation """
+
+ VNF_PROMPT = "pipeline>"
+ WAIT_TIME = 1
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ super(SampleVNF, self).__init__(name, vnfd)
+ self.bin_path = get_nsb_option('bin_path', '')
+
+ self.scenario_helper = ScenarioHelper(self.name)
+ self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
+
+ if setup_env_helper_type is None:
+ setup_env_helper_type = SetupEnvHelper
+
+ self.setup_helper = setup_env_helper_type(self.vnfd_helper,
+ self.ssh_helper,
+ self.scenario_helper)
+
+ self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
+
+ if resource_helper_type is None:
+ resource_helper_type = ResourceHelper
+
+ self.resource_helper = resource_helper_type(self.setup_helper)
+
+ self.all_ports = None
+ self.context_cfg = None
+ self.nfvi_context = None
+ self.pipeline_kwargs = {}
+ self.priv_ports = None
+ self.pub_ports = None
+ # TODO(esm): make QueueFileWrapper invert-able so that we
+ # never have to manage the queues
+ self.q_in = Queue()
+ self.q_out = Queue()
+ self.queue_wrapper = None
+ self.run_kwargs = {}
+ self.scenario_cfg = None
+ self.tg_port_pairs = None
+ self.used_drivers = {}
+ self.vnf_port_pairs = None
+ self._vnf_process = None
+
+ def _get_route_data(self, route_index, route_type):
+ route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
+ for _ in range(route_index):
+ next(route_iter, '')
+ return next(route_iter, {}).get(route_type, '')
+
+ def _get_port0localip6(self):
+ return_value = self._get_route_data(0, 'network')
+ LOG.info("_get_port0localip6 : %s", return_value)
+ return return_value
+
+ def _get_port1localip6(self):
+ return_value = self._get_route_data(1, 'network')
+ LOG.info("_get_port1localip6 : %s", return_value)
+ return return_value
+
+ def _get_port0prefixlen6(self):
+ return_value = self._get_route_data(0, 'netmask')
+ LOG.info("_get_port0prefixlen6 : %s", return_value)
+ return return_value
+
+ def _get_port1prefixlen6(self):
+ return_value = self._get_route_data(1, 'netmask')
+ LOG.info("_get_port1prefixlen6 : %s", return_value)
+ return return_value
+
+ def _get_port0gateway6(self):
+ return_value = self._get_route_data(0, 'network')
+ LOG.info("_get_port0gateway6 : %s", return_value)
+ return return_value
+
+ def _get_port1gateway6(self):
+ return_value = self._get_route_data(1, 'network')
+ LOG.info("_get_port1gateway6 : %s", return_value)
+ return return_value
+
+ def _start_vnf(self):
+ self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
+ self._vnf_process = Process(target=self._run)
+ self._vnf_process.start()
+
+ def _vnf_up_post(self):
+ pass
+
+ def instantiate(self, scenario_cfg, context_cfg):
+ self.scenario_helper.scenario_cfg = scenario_cfg
+ self.context_cfg = context_cfg
+ self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
+ # self.nfvi_context = None
+
+ self.deploy_helper.deploy_vnfs(self.APP_NAME)
+ self.resource_helper.setup()
+ self._start_vnf()
+
+ def wait_for_instantiate(self):
+ buf = []
+ time.sleep(self.WAIT_TIME) # Give some time for config to load
+ while True:
+ if not self._vnf_process.is_alive():
+ raise RuntimeError("%s VNF process died." % self.APP_NAME)
+
+ # TODO(esm): move to QueueFileWrapper
+ while self.q_out.qsize() > 0:
+ buf.append(self.q_out.get())
+ message = ''.join(buf)
+ if self.VNF_PROMPT in message:
+ LOG.info("%s VNF is up and running.", self.APP_NAME)
+ self._vnf_up_post()
+ self.queue_wrapper.clear()
+ self.resource_helper.start_collect()
+ return self._vnf_process.exitcode
+
+ if "PANIC" in message:
+ raise RuntimeError("Error starting %s VNF." %
+ self.APP_NAME)
+
+ LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
+ time.sleep(1)
+
+ def _build_run_kwargs(self):
+ self.run_kwargs = {
+ 'stdin': self.queue_wrapper,
+ 'stdout': self.queue_wrapper,
+ 'keep_stdin_open': True,
+ 'pty': True,
+ }
+
+ def _build_config(self):
+ return self.setup_helper.build_config()
+
+ def _run(self):
+ # we can't share ssh paramiko objects to force new connection
+ self.ssh_helper.drop_connection()
+ cmd = self._build_config()
+ # kill before starting
+ self.ssh_helper.execute("pkill {}".format(self.APP_NAME))
+
+ LOG.debug(cmd)
+ self._build_run_kwargs()
+ self.ssh_helper.run(cmd, **self.run_kwargs)
+
+ def vnf_execute(self, cmd, wait_time=2):
+ """ send cmd to vnf process """
+
+ LOG.info("%s command: %s", self.APP_NAME, cmd)
+ self.q_in.put("{}\r\n".format(cmd))
+ time.sleep(wait_time)
+ output = []
+ while self.q_out.qsize() > 0:
+ output.append(self.q_out.get())
+ return "".join(output)
+
+ def _tear_down(self):
+ pass
+
+ def terminate(self):
+ self.vnf_execute("quit")
+ if self._vnf_process:
+ self._vnf_process.terminate()
+ self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
+ self._tear_down()
+ self.resource_helper.stop_collect()
+
+ def get_stats(self, *args, **kwargs):
+ """
+ Method for checking the statistics
+
+ :return:
+ VNF statistics
+ """
+ cmd = 'p {0} stats'.format(self.APP_WORD)
+ out = self.vnf_execute(cmd)
+ return out
+
+ def collect_kpi(self):
+ stats = self.get_stats()
+ m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
+ if m:
+ result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
+ result["collect_stats"] = self.resource_helper.collect_kpi()
+ else:
+ result = {
+ "packets_in": 0,
+ "packets_fwd": 0,
+ "packets_dropped": 0,
+ }
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
+ return result
+
+
+class SampleVNFTrafficGen(GenericTrafficGen):
+ """ Class providing file-like API for generic traffic generator """
+
+ APP_NAME = 'Sample'
+ RUN_WAIT = 1
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ super(SampleVNFTrafficGen, self).__init__(name, vnfd)
+ self.bin_path = get_nsb_option('bin_path', '')
+ self.name = "tgen__1" # name in topology file
+
+ self.scenario_helper = ScenarioHelper(self.name)
+ self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
+
+ if setup_env_helper_type is None:
+ setup_env_helper_type = SetupEnvHelper
+
+ self.setup_helper = setup_env_helper_type(self.vnfd_helper,
+ self.ssh_helper,
+ self.scenario_helper)
+
+ if resource_helper_type is None:
+ resource_helper_type = ClientResourceHelper
+
+ self.resource_helper = resource_helper_type(self.setup_helper)
+
+ self.runs_traffic = True
+ self.traffic_finished = False
+ self.tg_port_pairs = None
+ self._tg_process = None
+ self._traffic_process = None
+
+ def _start_server(self):
+ # we can't share ssh paramiko objects to force new connection
+ self.ssh_helper.drop_connection()
+
+ def instantiate(self, scenario_cfg, context_cfg):
+ self.scenario_helper.scenario_cfg = scenario_cfg
+ self.resource_helper.generate_cfg()
+ self.setup_helper.setup_vnf_environment()
+ self.resource_helper.setup()
+
+ LOG.info("Starting %s server...", self.APP_NAME)
+ self._tg_process = Process(target=self._start_server)
+ self._tg_process.start()
+
+ def wait_for_instantiate(self):
+ return self._wait_for_process()
+
+ def _check_status(self):
+ raise NotImplementedError
+
+ def _wait_for_process(self):
+ while True:
+ if not self._tg_process.is_alive():
+ raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
+ LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
+ time.sleep(1)
+ status = self._check_status()
+ if status == 0:
+ LOG.info("%s TG Server is up and running.", self.APP_NAME)
+ return self._tg_process.exitcode
+
+ def _traffic_runner(self, traffic_profile):
+ LOG.info("Starting %s client...", self.APP_NAME)
+ self.resource_helper.run_traffic(traffic_profile)
+
+ def run_traffic(self, traffic_profile):
+ """ Generate traffic on the wire according to the given params.
+ Method is non-blocking, returns immediately when traffic process
+ is running. Mandatory.
+
+ :param traffic_profile:
+ :return: True/False
+ """
+ self._traffic_process = Process(target=self._traffic_runner,
+ args=(traffic_profile,))
+ self._traffic_process.start()
+ # Wait for traffic process to start
+ while self.resource_helper.client_started.value == 0:
+ time.sleep(self.RUN_WAIT)
+
+ return self._traffic_process.is_alive()
+
+ def listen_traffic(self, traffic_profile):
+ """ Listen to traffic with the given parameters.
+ Method is non-blocking, returns immediately when traffic process
+ is running. Optional.
+
+ :param traffic_profile:
+ :return: True/False
+ """
+ pass
+
+ def verify_traffic(self, traffic_profile):
+ """ Verify captured traffic after it has ended. Optional.
+
+ :param traffic_profile:
+ :return: dict
+ """
+ pass
+
+ def collect_kpi(self):
+ result = self.resource_helper.collect_kpi()
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
+ return result
+
+ def terminate(self):
+ """ After this method finishes, all traffic processes should stop. Mandatory.
+
+ :return: True/False
+ """
+ self.traffic_finished = True
+ if self._traffic_process is not None:
+ self._traffic_process.terminate()