aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
diff options
context:
space:
mode:
Diffstat (limited to 'yardstick/network_services/vnf_generic/vnf/vpe_vnf.py')
-rw-r--r--yardstick/network_services/vnf_generic/vnf/vpe_vnf.py545
1 files changed, 250 insertions, 295 deletions
diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
index e9e80bdfb..310ab67cb 100644
--- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
+++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
@@ -15,313 +15,268 @@
from __future__ import absolute_import
from __future__ import print_function
-import tempfile
-import time
import os
import logging
import re
-from multiprocessing import Queue
-import multiprocessing
-import ipaddress
-import six
+import posixpath
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
-from yardstick.network_services.utils import provision_tool
-from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
-from yardstick.network_services.nfvi.resource import ResourceProfile
+from six.moves import configparser, zip
-LOG = logging.getLogger(__name__)
-VPE_PIPELINE_COMMAND = '{tool_path} -p 0x3 -f {cfg_file} -s {script}'
-CORES = ['0', '1', '2']
-WAIT_TIME = 20
+from yardstick.network_services.pipeline import PipelineRules
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper
+LOG = logging.getLogger(__name__)
-class VpeApproxVnf(GenericVNF):
+VPE_PIPELINE_COMMAND = """sudo {tool_path} -p {ports_len_hex} -f {cfg_file} -s {script}"""
+
+VPE_COLLECT_KPI = """\
+Pkts in:\s(\d+)\r\n\
+\tPkts dropped by Pkts in:\s(\d+)\r\n\
+\tPkts dropped by AH:\s(\d+)\r\n\\
+\tPkts dropped by other:\s(\d+)\
+"""
+
+
+class ConfigCreate(object):
+
+ @staticmethod
+ def vpe_tmq(config, index):
+ tm_q = 'TM{0}'.format(index)
+ config.add_section(tm_q)
+ config.set(tm_q, 'burst_read', '24')
+ config.set(tm_q, 'burst_write', '32')
+ config.set(tm_q, 'cfg', '/tmp/full_tm_profile_10G.cfg')
+ return config
+
+ def __init__(self, priv_ports, pub_ports, socket):
+ super(ConfigCreate, self).__init__()
+ self.sw_q = -1
+ self.sink_q = -1
+ self.n_pipeline = 1
+ self.priv_ports = priv_ports
+ self.pub_ports = pub_ports
+ self.pipeline_per_port = 9
+ self.socket = socket
+
+ def vpe_initialize(self, config):
+ config.add_section('EAL')
+ config.set('EAL', 'log_level', '0')
+
+ config.add_section('PIPELINE0')
+ config.set('PIPELINE0', 'type', 'MASTER')
+ config.set('PIPELINE0', 'core', 's%sC0' % self.socket)
+
+ config.add_section('MEMPOOL0')
+ config.set('MEMPOOL0', 'pool_size', '256K')
+
+ config.add_section('MEMPOOL1')
+ config.set('MEMPOOL1', 'pool_size', '2M')
+ return config
+
+ def vpe_rxq(self, config):
+ for port in self.pub_ports:
+ new_section = 'RXQ{0}.0'.format(port)
+ config.add_section(new_section)
+ config.set(new_section, 'mempool', 'MEMPOOL1')
+
+ return config
+
+ def get_sink_swq(self, parser, pipeline, k, index):
+ sink = ""
+ pktq = parser.get(pipeline, k)
+ if "SINK" in pktq:
+ self.sink_q += 1
+ sink = " SINK{0}".format(self.sink_q)
+ if "TM" in pktq:
+ sink = " TM{0}".format(index)
+ pktq = "SWQ{0}{1}".format(self.sw_q, sink)
+ return pktq
+
+ def vpe_upstream(self, vnf_cfg, intf):
+ parser = configparser.ConfigParser()
+ parser.read(os.path.join(vnf_cfg, 'vpe_upstream'))
+ for pipeline in parser.sections():
+ for k, v in parser.items(pipeline):
+ if k == "pktq_in":
+ index = intf['index']
+ if "RXQ" in v:
+ value = "RXQ{0}.0".format(index)
+ else:
+ value = self.get_sink_swq(parser, pipeline, k, index)
+
+ parser.set(pipeline, k, value)
+
+ elif k == "pktq_out":
+ index = intf['peer_intf']['index']
+ if "TXQ" in v:
+ value = "TXQ{0}.0".format(index)
+ else:
+ self.sw_q += 1
+ value = self.get_sink_swq(parser, pipeline, k, index)
+
+ parser.set(pipeline, k, value)
+
+ new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline)
+ if new_pipeline != pipeline:
+ parser._sections[new_pipeline] = parser._sections[pipeline]
+ parser._sections.pop(pipeline)
+ self.n_pipeline += 1
+ return parser
+
+ def vpe_downstream(self, vnf_cfg, intf):
+ parser = configparser.ConfigParser()
+ parser.read(os.path.join(vnf_cfg, 'vpe_downstream'))
+ for pipeline in parser.sections():
+ for k, v in parser.items(pipeline):
+ index = intf['dpdk_port_num']
+ peer_index = intf['peer_intf']['dpdk_port_num']
+
+ if k == "pktq_in":
+ if "RXQ" not in v:
+ value = self.get_sink_swq(parser, pipeline, k, index)
+ elif "TM" in v:
+ value = "RXQ{0}.0 TM{1}".format(peer_index, index)
+ else:
+ value = "RXQ{0}.0".format(peer_index)
+
+ parser.set(pipeline, k, value)
+
+ if k == "pktq_out":
+ if "TXQ" not in v:
+ self.sw_q += 1
+ value = self.get_sink_swq(parser, pipeline, k, index)
+ elif "TM" in v:
+ value = "TXQ{0}.0 TM{1}".format(peer_index, index)
+ else:
+ value = "TXQ{0}.0".format(peer_index)
+
+ parser.set(pipeline, k, value)
+
+ new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline)
+ if new_pipeline != pipeline:
+ parser._sections[new_pipeline] = parser._sections[pipeline]
+ parser._sections.pop(pipeline)
+ self.n_pipeline += 1
+ return parser
+
+ def create_vpe_config(self, vnf_cfg):
+ config = configparser.ConfigParser()
+ vpe_cfg = os.path.join("/tmp/vpe_config")
+ with open(vpe_cfg, 'w') as cfg_file:
+ config = self.vpe_initialize(config)
+ config = self.vpe_rxq(config)
+ config.write(cfg_file)
+ for index, priv_port in enumerate(self.priv_ports):
+ config = self.vpe_upstream(vnf_cfg, priv_port)
+ config.write(cfg_file)
+ config = self.vpe_downstream(vnf_cfg, priv_port)
+ config = self.vpe_tmq(config, index)
+ config.write(cfg_file)
+
+ def generate_vpe_script(self, interfaces):
+ rules = PipelineRules(pipeline_id=1)
+ for priv_port, pub_port in zip(self.priv_ports, self.pub_ports):
+ priv_intf = interfaces[priv_port]["virtual-interface"]
+ pub_intf = interfaces[pub_port]["virtual-interface"]
+
+ dst_port0_ip = priv_intf["dst_ip"]
+ dst_port1_ip = pub_intf["dst_ip"]
+ dst_port0_mac = priv_intf["dst_mac"]
+ dst_port1_mac = pub_intf["dst_mac"]
+
+ rules.add_firewall_script(dst_port0_ip)
+ rules.next_pipeline()
+ rules.add_flow_classification_script()
+ rules.next_pipeline()
+ rules.add_flow_action()
+ rules.next_pipeline()
+ rules.add_flow_action2()
+ rules.next_pipeline()
+ rules.add_route_script(dst_port1_ip, dst_port1_mac)
+ rules.next_pipeline()
+ rules.add_route_script2(dst_port0_ip, dst_port0_mac)
+ rules.next_pipeline(num=4)
+
+ return rules.get_string()
+
+
+class VpeApproxSetupEnvHelper(DpdkVnfSetupEnvHelper):
+
+ CFG_CONFIG = "/tmp/vpe_config"
+ CFG_SCRIPT = "/tmp/vpe_script"
+ CORES = ['0', '1', '2', '3', '4', '5']
+ PIPELINE_COMMAND = VPE_PIPELINE_COMMAND
+
+ def build_config(self):
+ vpe_vars = {
+ "bin_path": self.ssh_helper.bin_path,
+ "socket": self.socket,
+ }
+
+ all_ports = []
+ priv_ports = []
+ pub_ports = []
+ for interface in self.vnfd_helper.interfaces:
+ all_ports.append(interface['name'])
+ vld_id = interface['virtual-interface']['vld_id']
+ if vld_id.startswith('private'):
+ priv_ports.append(interface)
+ elif vld_id.startswith('public'):
+ pub_ports.append(interface)
+
+ vpe_conf = ConfigCreate(priv_ports, pub_ports, self.socket)
+ vpe_conf.create_vpe_config(self.scenario_helper.vnf_cfg)
+
+ config_basename = posixpath.basename(self.CFG_CONFIG)
+ script_basename = posixpath.basename(self.CFG_SCRIPT)
+ with open(self.CFG_CONFIG) as handle:
+ vpe_config = handle.read()
+
+ self.ssh_helper.upload_config_file(config_basename, vpe_config.format(**vpe_vars))
+
+ vpe_script = vpe_conf.generate_vpe_script(self.vnfd_helper.interfaces)
+ self.ssh_helper.upload_config_file(script_basename, vpe_script.format(**vpe_vars))
+
+
+class VpeApproxVnf(SampleVNF):
""" This class handles vPE VNF model-driver definitions """
- def __init__(self, vnfd):
- super(VpeApproxVnf, self).__init__(vnfd)
- self.socket = None
- self.q_in = Queue()
- self.q_out = Queue()
- self.vnf_cfg = None
- self._vnf_process = None
- self.connection = None
- self.resource = None
-
- def _resource_collect_start(self):
- self.resource.initiate_systemagent(self.bin_path)
- self.resource.start()
+ APP_NAME = 'vPE_vnf'
+ APP_WORD = 'vpe'
+ COLLECT_KPI = VPE_COLLECT_KPI
+ WAIT_TIME = 20
- def _resource_collect_stop(self):
- self.resource.stop()
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if setup_env_helper_type is None:
+ setup_env_helper_type = VpeApproxSetupEnvHelper
- def _collect_resource_kpi(self):
- result = {}
+ super(VpeApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type)
- status = self.resource.check_if_sa_running("collectd")[0]
- if status:
- result = self.resource.amqp_collect_nfvi_kpi()
-
- result = {"core": result}
-
- return result
-
- @classmethod
- def __setup_hugepages(cls, connection):
- hugepages = \
- connection.execute(
- "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1]
- hugepages = hugepages.rstrip()
-
- memory_path = \
- '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
- connection.execute("awk -F: '{ print $1 }' < %s" % memory_path)
-
- pages = 16384 if hugepages.rstrip() == "2048kB" else 16
- connection.execute("echo %s > %s" % (pages, memory_path))
-
- def setup_vnf_environment(self, connection):
- ''' setup dpdk environment needed for vnf to run '''
-
- self.__setup_hugepages(connection)
- connection.execute("modprobe uio && modprobe igb_uio")
-
- exit_status = connection.execute("lsmod | grep -i igb_uio")[0]
- if exit_status == 0:
- return
-
- dpdk = os.path.join(self.bin_path, "dpdk-16.07")
- dpdk_setup = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "nsb_setup.sh"))
- status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0]
- if status:
- connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
-
- def _get_cpu_sibling_list(self):
- cpu_topo = []
- for core in CORES:
- sys_cmd = \
- "/sys/devices/system/cpu/cpu%s/topology/thread_siblings_list" \
- % core
- cpuid = \
- self.connection.execute("awk -F: '{ print $1 }' < %s" %
- sys_cmd)[1]
- cpu_topo += \
- [(idx) if idx.isdigit() else idx for idx in cpuid.split(',')]
-
- return [cpu.strip() for cpu in cpu_topo]
-
- def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(VpeApproxVnf, self).scale(flavor)
-
- def instantiate(self, scenario_cfg, context_cfg):
- vnf_cfg = scenario_cfg['vnf_options']['vpe']['cfg']
-
- mgmt_interface = self.vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
-
- self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc'])
-
- self.setup_vnf_environment(self.connection)
-
- cores = self._get_cpu_sibling_list()
- self.resource = ResourceProfile(self.vnfd, cores)
-
- self.connection.execute("pkill vPE_vnf")
- dpdk_nic_bind = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "dpdk_nic_bind.py"))
-
- interfaces = self.vnfd["vdu"][0]['external-interface']
- self.socket = \
- next((0 for v in interfaces
- if v['virtual-interface']["vpci"][5] == "0"), 1)
-
- bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
- for vpci in bound_pci:
- self.connection.execute(
- "%s --force -b igb_uio %s" % (dpdk_nic_bind, vpci))
- queue_wrapper = \
- QueueFileWrapper(self.q_in, self.q_out, "pipeline>")
- self._vnf_process = multiprocessing.Process(target=self._run_vpe,
- args=(queue_wrapper,
- vnf_cfg,))
- self._vnf_process.start()
- buf = []
- time.sleep(WAIT_TIME) # Give some time for config to load
- while True:
- message = ''
- while self.q_out.qsize() > 0:
- buf.append(self.q_out.get())
- message = ''.join(buf)
- if "pipeline>" in message:
- LOG.info("VPE VNF is up and running.")
- queue_wrapper.clear()
- self._resource_collect_start()
- return self._vnf_process.exitcode
- if "PANIC" in message:
- raise RuntimeError("Error starting vPE VNF.")
-
- LOG.info("Waiting for VNF to start.. ")
- time.sleep(3)
- if not self._vnf_process.is_alive():
- raise RuntimeError("vPE VNF process died.")
-
- def _get_ports_gateway(self, name):
- if 'routing_table' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['routing_table']
-
- for route in routing_table:
- if name == route['if']:
- return route['gateway']
-
- def terminate(self):
- self.execute_command("quit")
- if self._vnf_process:
- self._vnf_process.terminate()
-
- def _run_vpe(self, filewrapper, vnf_cfg):
- mgmt_interface = self.vnfd["mgmt-interface"]
-
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
-
- interfaces = self.vnfd["vdu"][0]['external-interface']
- port0_ip = ipaddress.ip_interface(six.text_type(
- "%s/%s" % (interfaces[0]["virtual-interface"]["local_ip"],
- interfaces[0]["virtual-interface"]["netmask"])))
- port1_ip = ipaddress.ip_interface(six.text_type(
- "%s/%s" % (interfaces[1]["virtual-interface"]["local_ip"],
- interfaces[1]["virtual-interface"]["netmask"])))
- dst_port0_ip = ipaddress.ip_interface(
- u"%s/%s" % (interfaces[0]["virtual-interface"]["dst_ip"],
- interfaces[0]["virtual-interface"]["netmask"]))
- dst_port1_ip = ipaddress.ip_interface(
- u"%s/%s" % (interfaces[1]["virtual-interface"]["dst_ip"],
- interfaces[1]["virtual-interface"]["netmask"]))
-
- vpe_vars = {"port0_local_ip": port0_ip.ip.exploded,
- "port0_dst_ip": dst_port0_ip.ip.exploded,
- "port0_local_ip_hex":
- self._ip_to_hex(port0_ip.ip.exploded),
- "port0_prefixlen": port0_ip.network.prefixlen,
- "port0_netmask": port0_ip.network.netmask.exploded,
- "port0_netmask_hex":
- self._ip_to_hex(port0_ip.network.netmask.exploded),
- "port0_local_mac":
- interfaces[0]["virtual-interface"]["local_mac"],
- "port0_dst_mac":
- interfaces[0]["virtual-interface"]["dst_mac"],
- "port0_gateway":
- self._get_ports_gateway(interfaces[0]["name"]),
- "port0_local_network":
- port0_ip.network.network_address.exploded,
- "port0_prefix": port0_ip.network.prefixlen,
- "port1_local_ip": port1_ip.ip.exploded,
- "port1_dst_ip": dst_port1_ip.ip.exploded,
- "port1_local_ip_hex":
- self._ip_to_hex(port1_ip.ip.exploded),
- "port1_prefixlen": port1_ip.network.prefixlen,
- "port1_netmask": port1_ip.network.netmask.exploded,
- "port1_netmask_hex":
- self._ip_to_hex(port1_ip.network.netmask.exploded),
- "port1_local_mac":
- interfaces[1]["virtual-interface"]["local_mac"],
- "port1_dst_mac":
- interfaces[1]["virtual-interface"]["dst_mac"],
- "port1_gateway":
- self._get_ports_gateway(interfaces[1]["name"]),
- "port1_local_network":
- port1_ip.network.network_address.exploded,
- "port1_prefix": port1_ip.network.prefixlen,
- "port0_local_ip6": self._get_port0localip6(),
- "port1_local_ip6": self._get_port1localip6(),
- "port0_prefixlen6": self._get_port0prefixlen6(),
- "port1_prefixlen6": self._get_port1prefixlen6(),
- "port0_gateway6": self._get_port0gateway6(),
- "port1_gateway6": self._get_port1gateway6(),
- "port0_dst_ip_hex6": self._get_port0localip6(),
- "port1_dst_ip_hex6": self._get_port1localip6(),
- "port0_dst_netmask_hex6": self._get_port0prefixlen6(),
- "port1_dst_netmask_hex6": self._get_port1prefixlen6(),
- "bin_path": self.bin_path,
- "socket": self.socket}
-
- for cfg in os.listdir(vnf_cfg):
- vpe_config = ""
- with open(os.path.join(vnf_cfg, cfg), 'r') as vpe_cfg:
- vpe_config = vpe_cfg.read()
-
- self._provide_config_file(cfg, vpe_config, vpe_vars)
-
- LOG.info("Provision and start the vPE")
- tool_path = provision_tool(self.connection,
- os.path.join(self.bin_path, "vPE_vnf"))
- cmd = VPE_PIPELINE_COMMAND.format(cfg_file="/tmp/vpe_config",
- script="/tmp/vpe_script",
- tool_path=tool_path)
- self.connection.run(cmd, stdin=filewrapper, stdout=filewrapper,
- keep_stdin_open=True, pty=True)
-
- def _provide_config_file(self, prefix, template, args):
- cfg, cfg_content = tempfile.mkstemp()
- cfg = os.fdopen(cfg, "w+")
- cfg.write(template.format(**args))
- cfg.close()
- cfg_file = "/tmp/%s" % prefix
- self.connection.put(cfg_content, cfg_file)
- return cfg_file
-
- def execute_command(self, cmd):
- ''' send cmd to vnf process '''
- LOG.info("VPE command: %s", cmd)
- output = []
- if self.q_in:
- self.q_in.put(cmd + "\r\n")
- time.sleep(3)
- while self.q_out.qsize() > 0:
- output.append(self.q_out.get())
- return "".join(output)
+ def get_stats(self, *args, **kwargs):
+ raise NotImplementedError
def collect_kpi(self):
- result = self.get_stats_vpe()
- collect_stats = self._collect_resource_kpi()
- result["collect_stats"] = collect_stats
- LOG.debug("vPE collet Kpis: %s", result)
- return result
-
- def get_stats_vpe(self):
- ''' get vpe statistics '''
- result = {'pkt_in_up_stream': 0, 'pkt_drop_up_stream': 0,
- 'pkt_in_down_stream': 0, 'pkt_drop_down_stream': 0}
- up_stat_commands = ['p 5 stats port in 0', 'p 5 stats port out 0',
- 'p 5 stats port out 1']
- down_stat_commands = ['p 9 stats port in 0', 'p 9 stats port out 0']
- pattern = \
- "Pkts in:\\s(\\d+)\\r\\n\\tPkts dropped by " \
- "AH:\\s(\\d+)\\r\\n\\tPkts dropped by other:\\s(\\d+)"
-
- for cmd in up_stat_commands:
- stats = self.execute_command(cmd)
- match = re.search(pattern, stats, re.MULTILINE)
- if match:
- result["pkt_in_up_stream"] = \
- result.get("pkt_in_up_stream", 0) + int(match.group(1))
- result["pkt_drop_up_stream"] = \
- result.get("pkt_drop_up_stream", 0) + \
- int(match.group(2)) + int(match.group(3))
-
- for cmd in down_stat_commands:
- stats = self.execute_command(cmd)
- match = re.search(pattern, stats, re.MULTILINE)
- if match:
- result["pkt_in_down_stream"] = \
- result.get("pkt_in_down_stream", 0) + int(match.group(1))
- result["pkt_drop_down_stream"] = \
- result.get("pkt_drop_down_stream", 0) + \
- int(match.group(2)) + int(match.group(3))
+ result = {
+ 'pkt_in_up_stream': 0,
+ 'pkt_drop_up_stream': 0,
+ 'pkt_in_down_stream': 0,
+ 'pkt_drop_down_stream': 0,
+ 'collect_stats': self.resource_helper.collect_kpi(),
+ }
+
+ indexes_in = [1]
+ indexes_drop = [2, 3]
+ command = 'p {0} stats port {1} 0'
+ for index, direction in ((5, 'up'), (9, 'down')):
+ key_in = "pkt_in_{0}_stream".format(direction)
+ key_drop = "pkt_drop_{0}_stream".format(direction)
+ for mode in ('in', 'out'):
+ stats = self.vnf_execute(command.format(index, mode))
+ match = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
+ if not match:
+ continue
+ result[key_in] += sum(int(match.group(x)) for x in indexes_in)
+ result[key_drop] += sum(int(match.group(x)) for x in indexes_drop)
+
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
return result