path: root/yardstick/network_services
diff options
Diffstat (limited to 'yardstick/network_services')
15 files changed, 2673 insertions, 1133 deletions
diff --git a/yardstick/network_services/helpers/__init__.py b/yardstick/network_services/helpers/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/yardstick/network_services/helpers/__init__.py
diff --git a/yardstick/network_services/helpers/cpu.py b/yardstick/network_services/helpers/cpu.py
new file mode 100644
index 000000000..a5ba6c31e
--- /dev/null
+++ b/yardstick/network_services/helpers/cpu.py
@@ -0,0 +1,76 @@
+# Copyright (c) 2016-2017 Intel Corporation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+class CpuSysCores(object):
+ def __init__(self, connection=""):
+ self.core_map = {}
+ self.connection = connection
+ def _open_cpuinfo(self):
+ lines = []
+ lines = self.connection.execute("cat /proc/cpuinfo")[1].split(u'\n')
+ return lines
+ def _get_core_details(self, lines):
+ core_details = []
+ core_lines = {}
+ for line in lines:
+ if line.strip():
+ name, value = line.split(":", 1)
+ core_lines[name.strip()] = value.strip()
+ else:
+ core_details.append(core_lines)
+ core_lines = {}
+ return core_details
+ def get_core_socket(self):
+ lines = self.connection.execute("lscpu")[1].split(u'\n')
+ num_cores = self._get_core_details(lines)
+ for num in num_cores:
+ self.core_map["cores_per_socket"] = num["Core(s) per socket"]
+ self.core_map["thread_per_core"] = num["Thread(s) per core"]
+ lines = self._open_cpuinfo()
+ core_details = self._get_core_details(lines)
+ for core in core_details:
+ for k, v in core.items():
+ if k == "physical id":
+ if core["physical id"] not in self.core_map:
+ self.core_map[core['physical id']] = []
+ self.core_map[core['physical id']].append(
+ core["processor"])
+ return self.core_map
+ def validate_cpu_cfg(self, vnf_cfg=None):
+ if vnf_cfg is None:
+ vnf_cfg = {
+ 'lb_config': 'SW',
+ 'lb_count': 1,
+ 'worker_config': '1C/1T',
+ 'worker_threads': 1
+ }
+ if self.core_map["thread_per_core"] == 1 and \
+ vnf_cfg["worker_config"] == "1C/2T":
+ return -1
+ if vnf_cfg['lb_config'] == 'SW':
+ num_cpu = int(vnf_cfg["worker_threads"]) + 5
+ if int(self.core_map["cores_per_socket"]) < num_cpu:
+ return -1
+ return 0
diff --git a/yardstick/network_services/helpers/samplevnf_helper.py b/yardstick/network_services/helpers/samplevnf_helper.py
new file mode 100644
index 000000000..1eefc5ffa
--- /dev/null
+++ b/yardstick/network_services/helpers/samplevnf_helper.py
@@ -0,0 +1,639 @@
+# Copyright (c) 2016-2017 Intel Corporation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import absolute_import
+import ipaddress
+import logging
+import os
+import sys
+from collections import OrderedDict, defaultdict
+from itertools import chain
+import six
+from six.moves.configparser import ConfigParser
+from yardstick.common.utils import ip_to_hex
+LOG = logging.getLogger(__name__)
+link {0} down
+link {0} config {1} {2}
+link {0} up
+p action add {0} accept
+p action add {0} fwd
+p action add {0} count
+p action add {0} accept
+p action add {0} fwd
+p action add {0} count
+p action add {0} conntrack
+# This sets up a basic passthrough with no rules
+class MultiPortConfig(object):
+ HW_LB = "HW"
+ @staticmethod
+ def float_x_plus_one_tenth_of_y(x, y):
+ return float(x) + float(y) / 10.0
+ @staticmethod
+ def make_str(base, iterator):
+ return ' '.join((base.format(x) for x in iterator))
+ @classmethod
+ def make_range_str(cls, base, start, stop=0, offset=0):
+ if offset and not stop:
+ stop = start + offset
+ return cls.make_str(base, range(start, stop))
+ @staticmethod
+ def parser_get(parser, section, key, default=None):
+ if parser.has_option(section, key):
+ return parser.get(section, key)
+ return default
+ @staticmethod
+ def make_ip_addr(ip, mask_len):
+ try:
+ return ipaddress.ip_interface(six.text_type('/'.join([ip, mask_len])))
+ except ValueError:
+ # None so we can skip later
+ return None
+ @classmethod
+ def validate_ip_and_prefixlen(cls, ip_addr, prefixlen):
+ ip_addr = cls.make_ip_addr(ip_addr, prefixlen)
+ return ip_addr.ip.exploded, ip_addr.network.prefixlen
+ def __init__(self, topology_file, config_tpl, tmp_file, interfaces=None,
+ vnf_type='CGNAT', lb_count=2, worker_threads=3,
+ worker_config='1C/1T', lb_config='SW', socket=0):
+ super(MultiPortConfig, self).__init__()
+ self.topology_file = topology_file
+ self.worker_config = worker_config.split('/')[1].lower()
+ self.worker_threads = self.get_worker_threads(worker_threads)
+ self.vnf_type = vnf_type
+ self.pipe_line = 0
+ self.interfaces = interfaces if interfaces else {}
+ self.networks = {}
+ self.write_parser = ConfigParser()
+ self.read_parser = ConfigParser()
+ self.read_parser.read(config_tpl)
+ self.master_core = self.read_parser.get("PIPELINE0", "core")
+ self.master_tpl = self.get_config_tpl_data('MASTER')
+ self.arpicmp_tpl = self.get_config_tpl_data('ARPICMP')
+ self.txrx_tpl = self.get_config_tpl_data('TXRX')
+ self.loadb_tpl = self.get_config_tpl_data('LOADB')
+ self.vnf_tpl = self.get_config_tpl_data(vnf_type)
+ self.swq = 0
+ self.lb_count = int(lb_count)
+ self.lb_config = lb_config
+ self.tmp_file = os.path.join("/tmp", tmp_file)
+ self.pktq_out_os = []
+ self.socket = socket
+ self.start_core = ""
+ self.pipeline_counter = ""
+ self.txrx_pipeline = ""
+ self.port_pair_list = []
+ self.lb_to_port_pair_mapping = {}
+ self.init_eal()
+ self.lb_index = None
+ self.mul = 0
+ self.port_pairs = []
+ self.port_pair_list = []
+ self.ports_len = 0
+ self.prv_que_handler = None
+ self.vnfd = None
+ self.rules = None
+ self.pktq_out = ''
+ @staticmethod
+ def gen_core(core):
+ # return "s{}c{}".format(self.socket, core)
+ # don't use sockets for VNFs, because we don't want to have to
+ # adjust VM CPU topology. It is virtual anyway
+ return str(core)
+ def make_port_pairs_iter(self, operand, iterable):
+ return (operand(x[-1], y) for y in iterable for x in chain(*self.port_pairs))
+ def make_range_port_pairs_iter(self, operand, start, end):
+ return self.make_port_pairs_iter(operand, range(start, end))
+ def init_eal(self):
+ vpci = [v['virtual-interface']["vpci"] for v in self.interfaces]
+ with open(self.tmp_file, 'w') as fh:
+ fh.write('[EAL]\n')
+ for item in vpci:
+ fh.write('w = {0}\n'.format(item))
+ fh.write('\n')
+ def update_timer(self):
+ timer_tpl = self.get_config_tpl_data('TIMER')
+ timer_tpl['core'] = self.gen_core(self.start_core)
+ self.update_write_parser(timer_tpl)
+ self.start_core += 1
+ def get_config_tpl_data(self, type_value):
+ for section in self.read_parser.sections():
+ if self.read_parser.has_option(section, 'type'):
+ if type_value == self.read_parser.get(section, 'type'):
+ tpl = OrderedDict(self.read_parser.items(section))
+ return tpl
+ def get_txrx_tpl_data(self, value):
+ for section in self.read_parser.sections():
+ if self.read_parser.has_option(section, 'pipeline_txrx_type'):
+ if value == self.read_parser.get(section, 'pipeline_txrx_type'):
+ tpl = OrderedDict(self.read_parser.items(section))
+ return tpl
+ def init_write_parser_template(self, type_value='ARPICMP'):
+ for section in self.read_parser.sections():
+ if type_value == self.parser_get(self.read_parser, section, 'type', object()):
+ self.start_core = self.read_parser.getint(section, 'core')
+ self.pipeline_counter = self.read_parser.getint(section, 'core')
+ self.txrx_pipeline = self.read_parser.getint(section, 'core')
+ return
+ self.write_parser.add_section(section)
+ for name, value in self.read_parser.items(section):
+ self.write_parser.set(section, name, value)
+ def update_write_parser(self, data):
+ section = "PIPELINE{0}".format(self.pipeline_counter)
+ self.write_parser.add_section(section)
+ for name, value in data.items():
+ self.write_parser.set(section, name, value)
+ def get_worker_threads(self, worker_threads):
+ if self.worker_config == '1t':
+ return worker_threads
+ else:
+ return worker_threads - worker_threads % 2
+ def generate_next_core_id(self):
+ if self.worker_config == '1t':
+ self.start_core += 1
+ return
+ try:
+ self.start_core = 'h{}'.format(int(self.start_core))
+ except ValueError:
+ self.start_core = int(self.start_core[:-1]) + 1
+ @staticmethod
+ def get_port_pairs(interfaces):
+ port_pair_list = []
+ networks = defaultdict(list)
+ for private_intf in interfaces:
+ vintf = private_intf['virtual-interface']
+ networks[vintf['vld_id']].append(vintf)
+ for name, net in networks.items():
+ # partition returns a tuple
+ parts = list(name.partition('private'))
+ if parts[0]:
+ # 'private' was not in or not leftmost in the string
+ continue
+ parts[1] = 'public'
+ public_id = ''.join(parts)
+ for private_intf in net:
+ try:
+ public_peer_intfs = networks[public_id]
+ except KeyError:
+ LOG.warning("private network without peer %s, %s not found", name, public_id)
+ continue
+ for public_intf in public_peer_intfs:
+ port_pair = private_intf["ifname"], public_intf["ifname"]
+ port_pair_list.append(port_pair)
+ return port_pair_list, networks
+ def get_lb_count(self):
+ self.lb_count = int(min(len(self.port_pair_list), self.lb_count))
+ def generate_lb_to_port_pair_mapping(self):
+ self.lb_to_port_pair_mapping = defaultdict(int)
+ port_pair_count = len(self.port_pair_list)
+ lb_pair_count = int(port_pair_count / self.lb_count)
+ for i in range(self.lb_count):
+ self.lb_to_port_pair_mapping[i + 1] = lb_pair_count
+ for i in range(port_pair_count % self.lb_count):
+ self.lb_to_port_pair_mapping[i + 1] += 1
+ def set_priv_to_pub_mapping(self):
+ return "".join(str(y) for y in [(int(x[0][-1]), int(x[1][-1])) for x in
+ self.port_pair_list])
+ def set_priv_que_handler(self):
+ # iterated twice, can't be generator
+ priv_to_pub_map = [(int(x[0][-1]), int(x[1][-1])) for x in self.port_pairs]
+ # must be list to use .index()
+ port_list = list(chain.from_iterable(priv_to_pub_map))
+ priv_ports = (x[0] for x in priv_to_pub_map)
+ self.prv_que_handler = '({})'.format(
+ ",".join((str(port_list.index(x)) for x in priv_ports)))
+ def generate_arp_route_tbl(self):
+ arp_config = []
+ arp_route_tbl_tmpl = "({port0_dst_ip_hex},{port0_netmask_hex},{port_num}," \
+ "{next_hop_ip_hex})"
+ for port_pair in self.port_pair_list:
+ for port in port_pair:
+ port_num = int(port[-1])
+ interface = self.interfaces[port_num]
+ # port0_ip = ipaddress.ip_interface(six.text_type(
+ # "%s/%s" % (interface["virtual-interface"]["local_ip"],
+ # interface["virtual-interface"]["netmask"])))
+ dst_port0_ip = \
+ ipaddress.ip_interface(six.text_type(
+ "%s/%s" % (interface["virtual-interface"]["dst_ip"],
+ interface["virtual-interface"]["netmask"])))
+ arp_vars = {
+ "port0_dst_ip_hex": ip_to_hex(dst_port0_ip.ip.exploded),
+ "port0_netmask_hex": ip_to_hex(dst_port0_ip.network.netmask.exploded),
+ "port_num": port_num,
+ # next hop is dst in this case
+ "next_hop_ip_hex": ip_to_hex(dst_port0_ip.ip.exploded),
+ }
+ arp_config.append(arp_route_tbl_tmpl.format(**arp_vars))
+ return ' '.join(arp_config)
+ def generate_arpicmp_data(self):
+ swq_in_str = self.make_range_str('SWQ{}', self.swq, offset=self.lb_count)
+ self.swq += self.lb_count
+ swq_out_str = self.make_range_str('SWQ{}', self.swq, offset=self.lb_count)
+ self.swq += self.lb_count
+ mac_iter = (self.interfaces[int(x[-1])]['virtual-interface']['local_mac']
+ for port_pair in self.port_pair_list for x in port_pair)
+ pktq_in_iter = ('RXQ{}'.format(float(x[0][-1])) for x in self.port_pair_list)
+ arpicmp_data = {
+ 'core': self.gen_core(self.start_core),
+ 'pktq_in': swq_in_str,
+ 'pktq_out': swq_out_str,
+ 'ports_mac_list': ' '.join(mac_iter),
+ 'pktq_in_prv': ' '.join(pktq_in_iter),
+ 'prv_to_pub_map': self.set_priv_to_pub_mapping(),
+ 'arp_route_tbl': self.generate_arp_route_tbl(),
+ # can't use empty string, defaul to ()
+ 'nd_route_tbl': "()",
+ }
+ self.pktq_out_os = swq_out_str.split(' ')
+ # why?
+ if self.lb_config == self.HW_LB:
+ arpicmp_data['pktq_in'] = swq_in_str
+ self.swq = 0
+ return arpicmp_data
+ def generate_final_txrx_data(self):
+ swq_start = self.swq - self.ports_len * self.worker_threads
+ txq_start = 0
+ txq_end = self.worker_threads
+ pktq_out_iter = self.make_range_port_pairs_iter(self.float_x_plus_one_tenth_of_y,
+ txq_start, txq_end)
+ swq_str = self.make_range_str('SWQ{}', swq_start, self.swq)
+ txq_str = self.make_str('TXQ{}', pktq_out_iter)
+ rxtx_data = {
+ 'pktq_in': swq_str,
+ 'pktq_out': txq_str,
+ 'pipeline_txrx_type': 'TXTX',
+ 'core': self.gen_core(self.start_core),
+ }
+ pktq_in = rxtx_data['pktq_in']
+ pktq_in = '{0} {1}'.format(pktq_in, self.pktq_out_os[self.lb_index - 1])
+ rxtx_data['pktq_in'] = pktq_in
+ self.pipeline_counter += 1
+ return rxtx_data
+ def generate_initial_txrx_data(self):
+ pktq_iter = self.make_range_port_pairs_iter(self.float_x_plus_one_tenth_of_y,
+ 0, self.worker_threads)
+ rxq_str = self.make_str('RXQ{}', pktq_iter)
+ swq_str = self.make_range_str('SWQ{}', self.swq, offset=self.ports_len)
+ txrx_data = {
+ 'pktq_in': rxq_str,
+ 'pktq_out': swq_str + ' SWQ{0}'.format(self.lb_index - 1),
+ 'pipeline_txrx_type': 'RXRX',
+ 'core': self.gen_core(self.start_core),
+ }
+ self.pipeline_counter += 1
+ return txrx_data
+ def generate_lb_data(self):
+ pktq_in = self.make_range_str('SWQ{}', self.swq, offset=self.ports_len)
+ self.swq += self.ports_len
+ offset = self.ports_len * self.worker_threads
+ pktq_out = self.make_range_str('SWQ{}', self.swq, offset=offset)
+ self.pktq_out = pktq_out.split()
+ self.swq += (self.ports_len * self.worker_threads)
+ lb_data = {
+ 'prv_que_handler': self.prv_que_handler,
+ 'pktq_in': pktq_in,
+ 'pktq_out': pktq_out,
+ 'n_vnf_threads': str(self.worker_threads),
+ 'core': self.gen_core(self.start_core),
+ }
+ self.pipeline_counter += 1
+ return lb_data
+ def generate_vnf_data(self):
+ if self.lb_config == self.HW_LB:
+ port_iter = self.make_port_pairs_iter(self.float_x_plus_one_tenth_of_y, [self.mul])
+ pktq_in = self.make_str('RXQ{}', port_iter)
+ self.mul += 1
+ port_iter = self.make_port_pairs_iter(self.float_x_plus_one_tenth_of_y, [self.mul])
+ pktq_out = self.make_str('TXQ{}', port_iter)
+ pipe_line_data = {
+ 'pktq_in': pktq_in,
+ 'pktq_out': pktq_out + ' SWQ{0}'.format(self.swq),
+ 'prv_que_handler': self.prv_que_handler,
+ 'core': self.gen_core(self.start_core),
+ }
+ self.swq += 1
+ else:
+ pipe_line_data = {
+ 'pktq_in': ' '.join((self.pktq_out.pop(0) for _ in range(self.ports_len))),
+ 'pktq_out': self.make_range_str('SWQ{}', self.swq, offset=self.ports_len),
+ 'prv_que_handler': self.prv_que_handler,
+ 'core': self.gen_core(self.start_core),
+ }
+ self.swq += self.ports_len
+ if self.vnf_type in ('ACL', 'VFW'):
+ pipe_line_data.pop('prv_que_handler')
+ if self.vnf_tpl.get('vnf_set'):
+ public_ip_port_range_list = self.vnf_tpl['public_ip_port_range'].split(':')
+ ip_in_hex = '{:x}'.format(int(public_ip_port_range_list[0], 16) + self.lb_index - 1)
+ public_ip_port_range_list[0] = ip_in_hex
+ self.vnf_tpl['public_ip_port_range'] = ':'.join(public_ip_port_range_list)
+ self.pipeline_counter += 1
+ return pipe_line_data
+ def generate_config_data(self):
+ self.init_write_parser_template()
+ # use master core for master, don't use self.start_core
+ self.write_parser.set('PIPELINE0', 'core', self.gen_core(self.master_core))
+ arpicmp_data = self.generate_arpicmp_data()
+ self.arpicmp_tpl.update(arpicmp_data)
+ self.update_write_parser(self.arpicmp_tpl)
+ self.start_core += 1
+ if self.vnf_type == 'CGNAPT':
+ self.pipeline_counter += 1
+ self.update_timer()
+ for lb in self.lb_to_port_pair_mapping:
+ self.lb_index = lb
+ self.mul = 0
+ port_pair_count = self.lb_to_port_pair_mapping[lb]
+ if not self.port_pair_list:
+ continue
+ self.port_pairs = self.port_pair_list[:port_pair_count]
+ self.port_pair_list = self.port_pair_list[port_pair_count:]
+ self.ports_len = port_pair_count * 2
+ self.set_priv_que_handler()
+ if self.lb_config == 'SW':
+ txrx_data = self.generate_initial_txrx_data()
+ self.txrx_tpl.update(txrx_data)
+ self.update_write_parser(self.txrx_tpl)
+ self.start_core += 1
+ lb_data = self.generate_lb_data()
+ self.loadb_tpl.update(lb_data)
+ self.update_write_parser(self.loadb_tpl)
+ self.start_core += 1
+ for i in range(self.worker_threads):
+ vnf_data = self.generate_vnf_data()
+ if not self.vnf_tpl:
+ self.vnf_tpl = {}
+ self.vnf_tpl.update(vnf_data)
+ self.update_write_parser(self.vnf_tpl)
+ try:
+ self.vnf_tpl.pop('vnf_set')
+ except KeyError:
+ pass
+ else:
+ self.vnf_tpl.pop('public_ip_port_range')
+ self.generate_next_core_id()
+ if self.lb_config == 'SW':
+ txrx_data = self.generate_final_txrx_data()
+ self.txrx_tpl.update(txrx_data)
+ self.update_write_parser(self.txrx_tpl)
+ self.start_core += 1
+ self.vnf_tpl = self.get_config_tpl_data(self.vnf_type)
+ def generate_config(self):
+ self.port_pair_list, self.networks = self.get_port_pairs(self.interfaces)
+ self.get_lb_count()
+ self.generate_lb_to_port_pair_mapping()
+ self.generate_config_data()
+ self.write_parser.write(sys.stdout)
+ with open(self.tmp_file, 'a') as tfh:
+ self.write_parser.write(tfh)
+ def generate_link_config(self):
+ link_configs = []
+ for port_pair in self.port_pair_list:
+ for port in port_pair:
+ port = port[-1]
+ virtual_interface = self.interfaces[int(port)]["virtual-interface"]
+ local_ip = virtual_interface["local_ip"]
+ netmask = virtual_interface["netmask"]
+ port_ip, prefix_len = self.validate_ip_and_prefixlen(local_ip, netmask)
+ link_configs.append(LINK_CONFIG_TEMPLATE.format(port, port_ip, prefix_len))
+ return ''.join(link_configs)
+ def get_route_data(self, src_key, data_key, port):
+ route_list = self.vnfd['vdu'][0].get(src_key, [])
+ return next((route[data_key] for route in route_list if route['if'] == port), None)
+ def get_ports_gateway(self, port):
+ return self.get_route_data('routing_table', 'gateway', port)
+ def get_ports_gateway6(self, port):
+ return self.get_route_data('nd_route_tbl', 'gateway', port)
+ def get_netmask_gateway(self, port):
+ return self.get_route_data('routing_table', 'netmask', port)
+ def get_netmask_gateway6(self, port):
+ return self.get_route_data('nd_route_tbl', 'netmask', port)
+ def generate_arp_config(self):
+ arp_config = []
+ for port_pair in self.port_pair_list:
+ for port in port_pair:
+ gateway = self.get_ports_gateway(port)
+ # omit entries with no gateway
+ if not gateway:
+ continue
+ dst_mac = self.interfaces[int(port[-1])]["virtual-interface"]["dst_mac"]
+ arp_config.append((port[-1], gateway, dst_mac, self.txrx_pipeline))
+ return '\n'.join(('p {3} arpadd {0} {1} {2}'.format(*values) for values in arp_config))
+ def generate_arp_config6(self):
+ arp_config6 = []
+ for port_pair in self.port_pair_list:
+ for port in port_pair:
+ gateway6 = self.get_ports_gateway6(port)
+ # omit entries with no gateway
+ if not gateway6:
+ continue
+ dst_mac6 = self.interfaces[int(port[-1])]["virtual-interface"]["dst_mac"]
+ arp_config6.append((port[-1], gateway6, dst_mac6, self.txrx_pipeline))
+ return '\n'.join(('p {3} arpadd {0} {1} {2}'.format(*values) for values in arp_config6))
+ def generate_action_config(self):
+ port_list = []
+ for port_pair in self.port_pair_list:
+ for port in port_pair:
+ port_list.append(port[-1])
+ if self.vnf_type == "VFW":
+ else:
+ template = ACTION_TEMPLATE
+ return ''.join((template.format(port) for port in port_list))
+ def get_ip_from_port(self, port):
+ return self.make_ip_addr(self.get_ports_gateway(port), self.get_netmask_gateway(port))
+ def get_ip_and_prefixlen_from_ip_of_port(self, port):
+ ip_addr = self.get_ip_from_port(port)
+ # handle cases with no gateway
+ if ip_addr:
+ return ip_addr.ip.exploded, ip_addr.network.prefixlen
+ else:
+ return None, None
+ def generate_rule_config(self):
+ cmd = 'acl' if self.vnf_type == "ACL" else "vfw"
+ rules_config = self.rules if self.rules else ''
+ new_rules = []
+ new_ipv6_rules = []
+ pattern = 'p {0} add {1} {2} {3} {4} {5} 0 65535 0 65535 0 0 {6}'
+ for port_pair in self.port_pair_list:
+ src_port = int(port_pair[0][-1])
+ dst_port = int(port_pair[1][-1])
+ src_ip, src_prefix_len = self.get_ip_and_prefixlen_from_ip_of_port(port_pair[0])
+ dst_ip, dst_prefix_len = self.get_ip_and_prefixlen_from_ip_of_port(port_pair[1])
+ # ignore entires with empty values
+ if all((src_ip, src_prefix_len, dst_ip, dst_prefix_len)):
+ new_rules.append((cmd, self.txrx_pipeline, src_ip, src_prefix_len,
+ dst_ip, dst_prefix_len, dst_port))
+ new_rules.append((cmd, self.txrx_pipeline, dst_ip, dst_prefix_len,
+ src_ip, src_prefix_len, src_port))
+ src_ip = self.get_ports_gateway6(port_pair[0])
+ src_prefix_len = self.get_netmask_gateway6(port_pair[0])
+ dst_ip = self.get_ports_gateway6(port_pair[1])
+ dst_prefix_len = self.get_netmask_gateway6(port_pair[0])
+ # ignore entires with empty values
+ if all((src_ip, src_prefix_len, dst_ip, dst_prefix_len)):
+ new_ipv6_rules.append((cmd, self.txrx_pipeline, src_ip, src_prefix_len,
+ dst_ip, dst_prefix_len, dst_port))
+ new_ipv6_rules.append((cmd, self.txrx_pipeline, dst_ip, dst_prefix_len,
+ src_ip, src_prefix_len, src_port))
+ acl_apply = "\np %s applyruleset" % cmd
+ new_rules_config = '\n'.join(pattern.format(*values) for values
+ in chain(new_rules, new_ipv6_rules))
+ return ''.join([rules_config, new_rules_config, acl_apply])
+ def generate_script_data(self):
+ self.port_pair_list, self.networks = self.get_port_pairs(self.interfaces)
+ self.get_lb_count()
+ script_data = {
+ 'link_config': self.generate_link_config(),
+ 'arp_config': self.generate_arp_config(),
+ 'arp_config6': self.generate_arp_config6(),
+ 'actions': '',
+ 'rules': '',
+ }
+ if self.vnf_type in ('ACL', 'VFW'):
+ script_data.update({
+ 'actions': self.generate_action_config(),
+ 'rules': self.generate_rule_config(),
+ })
+ return script_data
+ def generate_script(self, vnfd, rules=None):
+ self.vnfd = vnfd
+ self.rules = rules
+ script_data = self.generate_script_data()
+ script = SCRIPT_TPL.format(**script_data)
+ if self.lb_config == self.HW_LB:
+ script += 'set fwd rxonly'
+ hwlb_tpl = """
+set_sym_hash_ena_per_port {0} enable
+set_hash_global_config {0} simple_xor ipv4-udp enable
+set_sym_hash_ena_per_port {1} enable
+set_hash_global_config {1} simple_xor ipv4-udp enable
+set_hash_input_set {0} ipv4-udp src-ipv4 udp-src-port add
+set_hash_input_set {1} ipv4-udp dst-ipv4 udp-dst-port add
+set_hash_input_set {0} ipv6-udp src-ipv6 udp-src-port add
+set_hash_input_set {1} ipv6-udp dst-ipv6 udp-dst-port add
+ for port_pair in self.port_pair_list:
+ script += hwlb_tpl.format(port_pair[0][-1], port_pair[1][-1])
+ return script
diff --git a/yardstick/network_services/pipeline.py b/yardstick/network_services/pipeline.py
new file mode 100644
index 000000000..d781ba0cd
--- /dev/null
+++ b/yardstick/network_services/pipeline.py
@@ -0,0 +1,113 @@
+# Copyright (c) 2017 Intel Corporation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import absolute_import
+from __future__ import print_function
+import itertools
+from six.moves import zip
+FIREWALL_ADD_DEFAULT = "p {0} firewall add default 1"
+p {0} firewall add priority 1 ipv4 {1} 24 0 0 65535 0 65535 6 0xFF port 0"""
+p {0} flow add qinq 128 512 port 0 id 1
+p {0} flow add default 1"""
+ACTION_FLOW_BULK = "p {0} action flow bulk /tmp/action_bulk_512.txt"
+ACTION_DSCP_CLASS_COLOR = "p {0} action dscp {1} class {2} color {3}"
+ROUTE_ADD_DEFAULT = "p {0} route add default 1"
+ROUTE_ADD_ETHER_QINQ = 'p {0} route add {1} {2} port 0 ether {3} qinq 0 {4}'
+ROUTE_ADD_ETHER_MPLS = "p {0} route add {1} 21 port 0 ether {2} mpls 0:{3}"
+class PipelineRules(object):
+ def __init__(self, pipeline_id=0):
+ super(PipelineRules, self).__init__()
+ self.rule_list = []
+ self.pipeline_id = pipeline_id
+ def __str__(self):
+ return '\n'.join(self.rule_list)
+ def get_string(self):
+ return str(self)
+ def next_pipeline(self, num=1):
+ self.pipeline_id += num
+ def add_newline(self):
+ self.rule_list.append('')
+ def add_rule(self, base, *args):
+ self.rule_list.append(base.format(self.pipeline_id, *args))
+ def add_firewall_prio(self, ip):
+ self.add_rule(FIREWALL_ADD_PRIO, ip)
+ def add_firewall_script(self, ip):
+ ip_addr = ip.split('.')
+ assert len(ip_addr) == 4
+ ip_addr[-1] = '0'
+ for i in range(256):
+ ip_addr[-2] = str(i)
+ ip = '.'.join(ip_addr)
+ self.add_firewall_prio(ip)
+ self.add_rule(FIREWALL_ADD_DEFAULT)
+ self.add_newline()
+ def add_flow_classification_script(self):
+ self.add_rule(FLOW_ADD_QINQ_RULES)
+ def add_flow_action(self):
+ self.add_rule(ACTION_FLOW_BULK)
+ def add_dscp_class_color(self, dscp, color):
+ self.add_rule(ACTION_DSCP_CLASS_COLOR, dscp, dscp % 4, color)
+ def add_flow_action2(self):
+ self.add_rule(ACTION_FLOW_BULK)
+ for dscp, color in zip(range(64), itertools.cycle('GYR')):
+ self.add_dscp_class_color(dscp, color)
+ def add_route_ether_mpls(self, ip, mac_addr, index):
+ self.add_rule(ROUTE_ADD_ETHER_MPLS, ip, mac_addr, index)
+ def add_route_script(self, ip, mac_addr):
+ ip_addr = ip.split('.')
+ assert len(ip_addr) == 4
+ ip_addr[-1] = '0'
+ for index in range(0, 256, 8):
+ ip_addr[-2] = str(index)
+ ip = '.'.join(ip_addr)
+ self.add_route_ether_mpls(ip, mac_addr, index)
+ self.add_rule(ROUTE_ADD_DEFAULT)
+ self.add_newline()
+ def add_ether_qinq(self, ip, mask, mac_addr, index):
+ self.add_rule(ROUTE_ADD_ETHER_QINQ, ip, mask, mac_addr, index)
+ def add_route_script2(self, ip, mac_addr):
+ ip_addr = ip.split('.')
+ assert len(ip_addr) == 4
+ ip_addr[-1] = '0'
+ mask = 24
+ for i in range(0, 256):
+ ip_addr[-2] = str(i)
+ ip = '.'.join(ip_addr)
+ self.add_ether_qinq(ip, mask, mac_addr, i)
+ self.add_rule(ROUTE_ADD_DEFAULT)
+ self.add_newline()
diff --git a/yardstick/network_services/traffic_profile/rfc2544.py b/yardstick/network_services/traffic_profile/rfc2544.py
index 99964d329..b07bc9d5a 100644
--- a/yardstick/network_services/traffic_profile/rfc2544.py
+++ b/yardstick/network_services/traffic_profile/rfc2544.py
@@ -17,6 +17,10 @@ from __future__ import absolute_import
from __future__ import division
import logging
+from stl.trex_stl_lib.trex_stl_client import STLStream
+from stl.trex_stl_lib.trex_stl_streams import STLFlowLatencyStats
+from stl.trex_stl_lib.trex_stl_streams import STLTXCont
from yardstick.network_services.traffic_profile.traffic_profile \
import TrexProfile
@@ -28,79 +32,175 @@ class RFC2544Profile(TrexProfile):
def __init__(self, traffic_generator):
super(RFC2544Profile, self).__init__(traffic_generator)
+ self.generator = None
self.max_rate = None
self.min_rate = None
+ self.ports = None
self.rate = 100
- self.tmp_drop = None
- self.tmp_throughput = None
- self.profile_data = None
- def execute(self, traffic_generator):
- ''' Generate the stream and run traffic on the given ports '''
- if self.first_run:
- self.profile_data = self.params.get('private', '')
- ports = [traffic_generator.my_ports[0]]
- traffic_generator.client.add_streams(self.get_streams(),
- ports=ports[0])
- profile_data = self.params.get('public', '')
- if profile_data:
- self.profile_data = profile_data
- ports.append(traffic_generator.my_ports[1])
- traffic_generator.client.add_streams(self.get_streams(),
- ports=ports[1])
+ self.drop_percent_at_max_tx = None
+ self.throughput_max = None
- self.max_rate = self.rate
- self.min_rate = 0
- traffic_generator.client.start(ports=ports,
- mult=self.get_multiplier(),
- duration=30, force=True)
- self.tmp_drop = 0
- self.tmp_throughput = 0
+ def register_generator(self, generator):
+ self.generator = generator
+ def execute(self, traffic_generator=None):
+ """ Generate the stream and run traffic on the given ports """
+ if traffic_generator is not None and self.generator is None:
+ self.generator = traffic_generator
+ if self.ports is not None:
+ return
+ self.ports = []
+ priv_ports = self.generator.priv_ports
+ pub_ports = self.generator.pub_ports
+ # start from 1 for private_1, public_1, etc.
+ for index, (priv_port, pub_port) in enumerate(zip(priv_ports, pub_ports), 1):
+ profile_data = self.params.get('private_{}'.format(index), '')
+ self.ports.append(priv_port)
+ # pass profile_data directly, don't use self.profile_data
+ self.generator.client.add_streams(self.get_streams(profile_data), ports=priv_port)
+ profile_data = self.params.get('public_{}'.format(index), '')
+ # correlated traffic doesn't use public traffic?
+ if not profile_data or self.generator.rfc2544_helper.correlated_traffic:
+ continue
+ # just get the pub_port
+ self.ports.append(pub_port)
+ self.generator.client.add_streams(self.get_streams(profile_data), ports=pub_port)
+ self.max_rate = self.rate
+ self.min_rate = 0
+ self.generator.client.start(ports=self.ports, mult=self.get_multiplier(),
+ duration=30, force=True)
+ self.drop_percent_at_max_tx = 0
+ self.throughput_max = 0
def get_multiplier(self):
- ''' Get the rate at which next iternation to run '''
+ """ Get the rate at which next iteration to run """
self.rate = round((self.max_rate + self.min_rate) / 2.0, 2)
multiplier = round(self.rate / self.pps, 2)
return str(multiplier)
- def get_drop_percentage(self, traffic_generator,
- samples, tol_min, tolerance):
- ''' Calculate the drop percentage and run the traffic '''
- in_packets = sum([samples[iface]['in_packets'] for iface in samples])
- out_packets = sum([samples[iface]['out_packets'] for iface in samples])
+ def get_drop_percentage(self, generator=None):
+ """ Calculate the drop percentage and run the traffic """
+ if generator is None:
+ generator = self.generator
+ run_duration = self.generator.RUN_DURATION
+ samples = self.generator.generate_samples()
+ in_packets = sum([value['in_packets'] for value in samples.values()])
+ out_packets = sum([value['out_packets'] for value in samples.values()])
packet_drop = abs(out_packets - in_packets)
drop_percent = 100.0
- drop_percent = round((packet_drop / float(out_packets)) * 100, 2)
+ drop_percent = round((packet_drop / float(out_packets)) * 100, 5)
except ZeroDivisionError:
LOGGING.info('No traffic is flowing')
- samples['TxThroughput'] = out_packets / 30
- samples['RxThroughput'] = in_packets / 30
- samples['CurrentDropPercentage'] = drop_percent
- samples['Throughput'] = self.tmp_throughput
- samples['DropPercentage'] = self.tmp_drop
- if drop_percent > tolerance and self.tmp_throughput == 0:
- samples['Throughput'] = (in_packets / 30)
- samples['DropPercentage'] = drop_percent
- if self.first_run:
- max_supported_rate = out_packets / 30
- self.rate = max_supported_rate
+ # TODO(esm): RFC2544 doesn't tolerate packet loss, why do we?
+ tolerance_low = generator.rfc2544_helper.tolerance_low
+ tolerance_high = generator.rfc2544_helper.tolerance_high
+ tx_rate = out_packets / run_duration
+ rx_rate = in_packets / run_duration
+ throughput_max = self.throughput_max
+ drop_percent_at_max_tx = self.drop_percent_at_max_tx
+ if self.drop_percent_at_max_tx is None:
+ self.rate = tx_rate
self.first_run = False
- if drop_percent > tolerance:
+ if drop_percent > tolerance_high:
+ # TODO(esm): why don't we discard results that are out of tolerance?
self.max_rate = self.rate
- elif drop_percent < tol_min:
+ if throughput_max == 0:
+ throughput_max = rx_rate
+ drop_percent_at_max_tx = drop_percent
+ elif drop_percent >= tolerance_low:
+ # TODO(esm): why do we update the samples dict in this case
+ # and not update our tracking values?
+ throughput_max = rx_rate
+ drop_percent_at_max_tx = drop_percent
+ elif drop_percent >= self.drop_percent_at_max_tx:
+ # TODO(esm): why don't we discard results that are out of tolerance?
self.min_rate = self.rate
- if drop_percent >= self.tmp_drop:
- self.tmp_drop = drop_percent
- self.tmp_throughput = (in_packets / 30)
- samples['Throughput'] = (in_packets / 30)
- samples['DropPercentage'] = drop_percent
+ self.drop_percent_at_max_tx = drop_percent_at_max_tx = drop_percent
+ self.throughput_max = throughput_max = rx_rate
- samples['Throughput'] = (in_packets / 30)
- samples['DropPercentage'] = drop_percent
+ # TODO(esm): why don't we discard results that are out of tolerance?
+ self.min_rate = self.rate
+ generator.clear_client_stats()
+ generator.start_client(mult=self.get_multiplier(),
+ duration=run_duration, force=True)
+ # if correlated traffic update the Throughput
+ if generator.rfc2544_helper.correlated_traffic:
+ throughput_max *= 2
+ samples.update({
+ 'TxThroughput': tx_rate,
+ 'RxThroughput': rx_rate,
+ 'CurrentDropPercentage': drop_percent,
+ 'Throughput': throughput_max,
+ 'DropPercentage': drop_percent_at_max_tx,
+ })
- traffic_generator.client.clear_stats(ports=traffic_generator.my_ports)
- traffic_generator.client.start(ports=traffic_generator.my_ports,
- mult=self.get_multiplier(),
- duration=30, force=True)
return samples
+ def execute_latency(self, generator=None, samples=None):
+ if generator is None:
+ generator = self.generator
+ if samples is None:
+ samples = generator.generate_samples()
+ self.pps, multiplier = self.calculate_pps(samples)
+ self.ports = []
+ self.pg_id = self.params['traffic_profile'].get('pg_id', 1)
+ priv_ports = generator.priv_ports
+ pub_ports = generator.pub_ports
+ for index, (priv_port, pub_port) in enumerate(zip(priv_ports, pub_ports), 1):
+ profile_data = self.params.get('private_{}'.format(index), '')
+ self.ports.append(priv_port)
+ generator.client.add_streams(self.get_streams(profile_data),
+ ports=priv_port)
+ profile_data = self.params.get('public_{}'.format(index), '')
+ if not profile_data or generator.correlated_traffic:
+ continue
+ pub_port = generator.pub_ports[index]
+ self.ports.append(pub_port)
+ generator.client.add_streams(self.get_streams(profile_data),
+ ports=pub_port)
+ generator.start_client(ports=self.ports, mult=str(multiplier),
+ duration=120, force=True)
+ self.first_run = False
+ def calculate_pps(self, samples):
+ pps = round(samples['Throughput'] / 2, 2)
+ multiplier = round(self.rate / self.pps, 2)
+ return pps, multiplier
+ def create_single_stream(self, packet_size, pps, isg=0):
+ packet = self._create_single_packet(packet_size)
+ if pps:
+ stl_mode = STLTXCont(pps=pps)
+ else:
+ stl_mode = STLTXCont(pps=self.pps)
+ if self.pg_id:
+ LOGGING.debug("pg_id: %s", self.pg_id)
+ stl_flow_stats = STLFlowLatencyStats(pg_id=self.pg_id)
+ stream = STLStream(isg=isg, packet=packet, mode=stl_mode,
+ flow_stats=stl_flow_stats)
+ self.pg_id += 1
+ else:
+ stream = STLStream(isg=isg, packet=packet, mode=stl_mode)
+ return stream
diff --git a/yardstick/network_services/traffic_profile/traffic_profile.py b/yardstick/network_services/traffic_profile/traffic_profile.py
index 156cc6644..3e1f8d89f 100644
--- a/yardstick/network_services/traffic_profile/traffic_profile.py
+++ b/yardstick/network_services/traffic_profile/traffic_profile.py
@@ -399,16 +399,19 @@ class TrexProfile(TrafficProfile):
logging.debug("Imax: %s rate: %s", imix_count, self.rate)
return imix_count
- def get_streams(self):
- """ generate trex stream """
+ def get_streams(self, profile_data):
+ """ generate trex stream
+ :param profile_data:
+ :type profile_data:
+ """
self.streams = []
self.pps = self.params['traffic_profile'].get('frame_rate', 100)
- for packet_name in self.profile_data:
- outer_l2 = self.profile_data[packet_name].get('outer_l2')
+ for packet_name in profile_data:
+ outer_l2 = profile_data[packet_name].get('outer_l2')
imix_data = self.generate_imix_data(outer_l2)
if not imix_data:
imix_data = {64: self.pps}
- self.generate_vm(self.profile_data[packet_name])
+ self.generate_vm(profile_data[packet_name])
for size in imix_data:
self._generate_streams(size, imix_data[size])
diff --git a/yardstick/network_services/utils.py b/yardstick/network_services/utils.py
index cb71a6029..38fbda47f 100644
--- a/yardstick/network_services/utils.py
+++ b/yardstick/network_services/utils.py
@@ -45,17 +45,19 @@ def get_nsb_option(option, default=None):
return default
-def provision_tool(connection, tool_path):
+def provision_tool(connection, tool_path, tool_file=None):
verify if the tool path exits on the node,
if not push the local binary to remote node
:return - Tool path
+ if tool_file:
+ tool_path = os.path.join(tool_path, tool_file)
bin_path = get_nsb_option("bin_path")
- exit_status, stdout = connection.execute("which %s" % tool_path)[:2]
+ exit_status, stdout = connection.execute("which %s > /dev/null 2>&1" % tool_path)[:2]
if exit_status == 0:
- return encodeutils.safe_decode(stdout, incoming='utf-8').rstrip()
+ return encodeutils.safe_decode(tool_path, incoming='utf-8').rstrip()
logging.warning("%s not found on %s, will try to copy from localhost",
tool_path, connection.host)
diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py
index 2df6037f3..955f9f03d 100644
--- a/yardstick/network_services/vnf_generic/vnf/base.py
+++ b/yardstick/network_services/vnf_generic/vnf/base.py
@@ -15,10 +15,6 @@
from __future__ import absolute_import
import logging
-import ipaddress
-import six
-from yardstick.network_services.utils import get_nsb_option
LOG = logging.getLogger(__name__)
@@ -61,192 +57,69 @@ class QueueFileWrapper(object):
-class GenericVNF(object):
+class VnfdHelper(dict):
+ @property
+ def mgmt_interface(self):
+ return self["mgmt-interface"]
+ @property
+ def vdu(self):
+ return self['vdu']
+ @property
+ def vdu0(self):
+ return self.vdu[0]
+ @property
+ def interfaces(self):
+ return self.vdu0['external-interface']
+ @property
+ def kpi(self):
+ return self['benchmark']['kpi']
+ def find_virtual_interface(self, **kwargs):
+ key, value = next(iter(kwargs.items()))
+ for interface in self.interfaces:
+ virtual_intf = interface["virtual-interface"]
+ if virtual_intf[key] == value:
+ return interface
+ def find_interface(self, **kwargs):
+ key, value = next(iter(kwargs.items()))
+ for interface in self.interfaces:
+ if interface[key] == value:
+ return interface
+class VNFObject(object):
+ def __init__(self, name, vnfd):
+ super(VNFObject, self).__init__()
+ self.name = name
+ self.vnfd_helper = VnfdHelper(vnfd) # fixme: parse this into a structure
+class GenericVNF(VNFObject):
""" Class providing file-like API for generic VNF implementation """
- def __init__(self, vnfd):
- super(GenericVNF, self).__init__()
- self.vnfd = vnfd # fixme: parse this into a structure
+ def __init__(self, name, vnfd):
+ super(GenericVNF, self).__init__(name, vnfd)
# List of statistics we can obtain from this VNF
# - ETSI MANO monitoring_parameter
- self.kpi = self._get_kpi_definition(vnfd)
+ self.kpi = self._get_kpi_definition()
# Standard dictionary containing params like thread no, buffer size etc
self.config = {}
self.runs_traffic = False
- self.name = "vnf__1" # name in topology file
- self.bin_path = get_nsb_option("bin_path", "")
- @classmethod
- def _get_kpi_definition(cls, vnfd):
+ def _get_kpi_definition(self):
""" Get list of KPIs defined in VNFD
:param vnfd:
:return: list of KPIs, e.g. ['throughput', 'latency']
- return vnfd['benchmark']['kpi']
- @classmethod
- def get_ip_version(cls, ip_addr):
- """ get ip address version v6 or v4 """
- try:
- address = ipaddress.ip_address(six.text_type(ip_addr))
- except ValueError:
- LOG.error(ip_addr, " is not valid")
- return
- else:
- return address.version
- def _ip_to_hex(self, ip_addr):
- ip_x = ip_addr
- if self.get_ip_version(ip_addr) == 4:
- ip_to_convert = ip_addr.split(".")
- ip_octect = [int(octect) for octect in ip_to_convert]
- ip_x = "{0[0]:02X}{0[1]:02X}{0[2]:02X}{0[3]:02X}".format(ip_octect)
- return ip_x
- def _get_dpdk_port_num(self, name):
- for intf in self.vnfd['vdu'][0]['external-interface']:
- if name == intf['name']:
- return intf['virtual-interface']['dpdk_port_num']
- def _append_routes(self, ip_pipeline_cfg):
- if 'routing_table' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['routing_table']
- where = ip_pipeline_cfg.find("arp_route_tbl")
- link = ip_pipeline_cfg[:where]
- route_add = ip_pipeline_cfg[where:]
- tmp = route_add.find('\n')
- route_add = route_add[tmp:]
- cmds = "arp_route_tbl ="
- for route in routing_table:
- net = self._ip_to_hex(route['network'])
- net_nm = self._ip_to_hex(route['netmask'])
- net_gw = self._ip_to_hex(route['gateway'])
- port = self._get_dpdk_port_num(route['if'])
- cmd = \
- " ({port0_local_ip_hex},{port0_netmask_hex},{dpdk_port},"\
- "{port1_local_ip_hex})".format(port0_local_ip_hex=net,
- port0_netmask_hex=net_nm,
- dpdk_port=port,
- port1_local_ip_hex=net_gw)
- cmds += cmd
- cmds += '\n'
- ip_pipeline_cfg = link + cmds + route_add
- return ip_pipeline_cfg
- def _append_nd_routes(self, ip_pipeline_cfg):
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
- where = ip_pipeline_cfg.find("nd_route_tbl")
- link = ip_pipeline_cfg[:where]
- route_nd = ip_pipeline_cfg[where:]
- tmp = route_nd.find('\n')
- route_nd = route_nd[tmp:]
- cmds = "nd_route_tbl ="
- for route in routing_table:
- net = route['network']
- net_nm = route['netmask']
- net_gw = route['gateway']
- port = self._get_dpdk_port_num(route['if'])
- cmd = \
- " ({port0_local_ip_hex},{port0_netmask_hex},{dpdk_port},"\
- "{port1_local_ip_hex})".format(port0_local_ip_hex=net,
- port0_netmask_hex=net_nm,
- dpdk_port=port,
- port1_local_ip_hex=net_gw)
- cmds += cmd
- cmds += '\n'
- ip_pipeline_cfg = link + cmds + route_nd
- return ip_pipeline_cfg
- def _get_port0localip6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 1:
- return_value = route['network']
- LOG.info("_get_port0localip6 : %s", return_value)
- return return_value
- def _get_port1localip6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 2:
- return_value = route['network']
- LOG.info("_get_port1localip6 : %s", return_value)
- return return_value
- def _get_port0prefixlen6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 1:
- return_value = route['netmask']
- LOG.info("_get_port0prefixlen6 : %s", return_value)
- return return_value
- def _get_port1prefixlen6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 2:
- return_value = route['netmask']
- LOG.info("_get_port1prefixlen6 : %s", return_value)
- return return_value
- def _get_port0gateway6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 1:
- return_value = route['network']
- LOG.info("_get_port0gateway6 : %s", return_value)
- return return_value
- def _get_port1gateway6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 2:
- return_value = route['network']
- LOG.info("_get_port1gateway6 : %s", return_value)
- return return_value
+ return self.vnfd_helper.kpi
def instantiate(self, scenario_cfg, context_cfg):
""" Prepare VNF for operation and start the VNF process/VM
@@ -284,11 +157,10 @@ class GenericVNF(object):
class GenericTrafficGen(GenericVNF):
""" Class providing file-like API for generic traffic generator """
- def __init__(self, vnfd):
- super(GenericTrafficGen, self).__init__(vnfd)
+ def __init__(self, name, vnfd):
+ super(GenericTrafficGen, self).__init__(name, vnfd)
self.runs_traffic = True
self.traffic_finished = False
- self.name = "tgen__1" # name in topology file
def run_traffic(self, traffic_profile):
""" Generate traffic on the wire according to the given params.
diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
new file mode 100644
index 000000000..89c086d97
--- /dev/null
+++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
@@ -0,0 +1,994 @@
+# Copyright (c) 2016-2017 Intel Corporation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" Base class implementation for generic vnf implementation """
+from __future__ import absolute_import
+import posixpath
+import time
+import logging
+import os
+import re
+import subprocess
+from collections import Mapping
+from multiprocessing import Queue, Value, Process
+from six.moves import cStringIO
+from yardstick.benchmark.contexts.base import Context
+from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
+from yardstick.network_services.helpers.cpu import CpuSysCores
+from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
+from yardstick.network_services.nfvi.resource import ResourceProfile
+from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
+from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
+from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
+from yardstick.network_services.utils import get_nsb_option
+from stl.trex_stl_lib.trex_stl_client import STLClient
+from stl.trex_stl_lib.trex_stl_client import LoggerApi
+from stl.trex_stl_lib.trex_stl_exceptions import STLError, STLStateError
+from yardstick.ssh import AutoConnectSSH
+DPDK_VERSION = "dpdk-16.07"
+LOG = logging.getLogger(__name__)
+REMOTE_TMP = "/tmp"
+class VnfSshHelper(AutoConnectSSH):
+ def __init__(self, node, bin_path, wait=None):
+ self.node = node
+ kwargs = self.args_from_node(self.node)
+ if wait:
+ kwargs.setdefault('wait', wait)
+ super(VnfSshHelper, self).__init__(**kwargs)
+ self.bin_path = bin_path
+ @staticmethod
+ def get_class():
+ # must return static class name, anything else refers to the calling class
+ # i.e. the subclass, not the superclass
+ return VnfSshHelper
+ def copy(self):
+ # this copy constructor is different from SSH classes, since it uses node
+ return self.get_class()(self.node, self.bin_path)
+ def upload_config_file(self, prefix, content):
+ cfg_file = os.path.join(REMOTE_TMP, prefix)
+ LOG.debug(content)
+ file_obj = cStringIO(content)
+ self.put_file_obj(file_obj, cfg_file)
+ return cfg_file
+ def join_bin_path(self, *args):
+ return os.path.join(self.bin_path, *args)
+ def provision_tool(self, tool_path=None, tool_file=None):
+ if tool_path is None:
+ tool_path = self.bin_path
+ return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
+class SetupEnvHelper(object):
+ CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
+ CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
+ CORES = []
+ DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
+ def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
+ super(SetupEnvHelper, self).__init__()
+ self.vnfd_helper = vnfd_helper
+ self.ssh_helper = ssh_helper
+ self.scenario_helper = scenario_helper
+ def _get_ports_gateway(self, name):
+ routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
+ for route in routing_table:
+ if name == route['if']:
+ return route['gateway']
+ return None
+ def build_config(self):
+ raise NotImplementedError
+ def setup_vnf_environment(self):
+ pass
+ # raise NotImplementedError
+ def tear_down(self):
+ raise NotImplementedError
+class DpdkVnfSetupEnvHelper(SetupEnvHelper):
+ APP_NAME = 'DpdkVnf'
+ DPDK_BIND_CMD = "sudo {dpdk_nic_bind} {force} -b {driver} {vpci}"
+ DPDK_UNBIND_CMD = "sudo {dpdk_nic_bind} --force -b {driver} {vpci}"
+ FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
+ DPDK_STATUS_DRIVER_RE = re.compile(r"(\d{2}:\d{2}\.\d).*drv=([-\w]+)")
+ @staticmethod
+ def _update_packet_type(ip_pipeline_cfg, traffic_options):
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
+ pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
+ return pipeline_config_str
+ @classmethod
+ def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
+ traffic_type = traffic_options['traffic_type']
+ if traffic_options['vnf_type'] is not cls.APP_NAME:
+ match_str = 'traffic_type = 4'
+ replace_str = 'traffic_type = {0}'.format(traffic_type)
+ elif traffic_type == 4:
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = ipv4'
+ else:
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = ipv6'
+ pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
+ return pipeline_config_str
+ def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
+ super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
+ self.all_ports = None
+ self.bound_pci = None
+ self._dpdk_nic_bind = None
+ self.socket = None
+ @property
+ def dpdk_nic_bind(self):
+ if self._dpdk_nic_bind is None:
+ self._dpdk_nic_bind = self.ssh_helper.provision_tool(tool_file="dpdk-devbind.py")
+ return self._dpdk_nic_bind
+ def _setup_hugepages(self):
+ cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
+ hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
+ memory_path = \
+ '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
+ self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
+ if hugepages == "2048kB":
+ pages = 16384
+ else:
+ pages = 16
+ self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
+ def _get_dpdk_port_num(self, name):
+ interface = self.vnfd_helper.find_interface(name=name)
+ return interface['virtual-interface']['dpdk_port_num']
+ def build_config(self):
+ vnf_cfg = self.scenario_helper.vnf_cfg
+ task_path = self.scenario_helper.task_path
+ lb_count = vnf_cfg.get('lb_count', 3)
+ lb_config = vnf_cfg.get('lb_config', 'SW')
+ worker_config = vnf_cfg.get('worker_config', '1C/1T')
+ worker_threads = vnf_cfg.get('worker_threads', 3)
+ traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
+ traffic_options = {
+ 'traffic_type': traffic_type,
+ 'pkt_type': 'ipv%s' % traffic_type,
+ 'vnf_type': self.VNF_TYPE,
+ }
+ config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
+ config_basename = posixpath.basename(self.CFG_CONFIG)
+ script_basename = posixpath.basename(self.CFG_SCRIPT)
+ multiport = MultiPortConfig(self.scenario_helper.topology,
+ config_tpl_cfg,
+ config_basename,
+ self.vnfd_helper.interfaces,
+ self.VNF_TYPE,
+ lb_count,
+ worker_threads,
+ worker_config,
+ lb_config,
+ self.socket)
+ multiport.generate_config()
+ with open(self.CFG_CONFIG) as handle:
+ new_config = handle.read()
+ new_config = self._update_traffic_type(new_config, traffic_options)
+ new_config = self._update_packet_type(new_config, traffic_options)
+ self.ssh_helper.upload_config_file(config_basename, new_config)
+ self.ssh_helper.upload_config_file(script_basename,
+ multiport.generate_script(self.vnfd_helper))
+ self.all_ports = multiport.port_pair_list
+ LOG.info("Provision and start the %s", self.APP_NAME)
+ self._build_pipeline_kwargs()
+ return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
+ def _build_pipeline_kwargs(self):
+ tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
+ ports_len_hex = hex(2 ** (len(self.all_ports) + 1) - 1)
+ self.pipeline_kwargs = {
+ 'cfg_file': self.CFG_CONFIG,
+ 'script': self.CFG_SCRIPT,
+ 'ports_len_hex': ports_len_hex,
+ 'tool_path': tool_path,
+ }
+ def _get_app_cpu(self):
+ if self.CORES:
+ return self.CORES
+ vnf_cfg = self.scenario_helper.vnf_cfg
+ sys_obj = CpuSysCores(self.ssh_helper)
+ self.sys_cpu = sys_obj.get_core_socket()
+ num_core = int(vnf_cfg["worker_threads"])
+ if vnf_cfg.get("lb_config", "SW") == 'HW':
+ num_core += self.HW_DEFAULT_CORE
+ else:
+ num_core += self.SW_DEFAULT_CORE
+ app_cpu = self.sys_cpu[str(self.socket)][:num_core]
+ return app_cpu
+ def _get_cpu_sibling_list(self, cores=None):
+ if cores is None:
+ cores = self._get_app_cpu()
+ sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
+ awk_template = "awk -F: '{ print $1 }' < %s"
+ sys_path = "/sys/devices/system/cpu/"
+ cpu_topology = []
+ try:
+ for core in cores:
+ sys_cmd = sys_cmd_template % (sys_path, core)
+ cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
+ cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
+ return cpu_topology
+ except Exception:
+ return []
+ def _validate_cpu_cfg(self):
+ return self._get_cpu_sibling_list()
+ def _find_used_drivers(self):
+ cmd = "{0} -s".format(self.dpdk_nic_bind)
+ rc, dpdk_status, _ = self.ssh_helper.execute(cmd)
+ self.used_drivers = {
+ vpci: (index, driver)
+ for index, (vpci, driver)
+ in enumerate(self.DPDK_STATUS_DRIVER_RE.findall(dpdk_status))
+ if any(b.endswith(vpci) for b in self.bound_pci)
+ }
+ def setup_vnf_environment(self):
+ self._setup_dpdk()
+ resource = self._setup_resources()
+ self._kill_vnf()
+ self._detect_drivers()
+ return resource
+ def _kill_vnf(self):
+ self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
+ def _setup_dpdk(self):
+ """ setup dpdk environment needed for vnf to run """
+ self._setup_hugepages()
+ self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
+ exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
+ if exit_status == 0:
+ return
+ dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
+ dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
+ exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
+ if exit_status != 0:
+ self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
+ def _setup_resources(self):
+ interfaces = self.vnfd_helper.interfaces
+ self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
+ # what is this magic? how do we know which socket is for which port?
+ # what about quad-socket?
+ if any(v[5] == "0" for v in self.bound_pci):
+ self.socket = 0
+ else:
+ self.socket = 1
+ cores = self._validate_cpu_cfg()
+ return ResourceProfile(self.vnfd_helper, cores)
+ def _detect_drivers(self):
+ interfaces = self.vnfd_helper.interfaces
+ self._find_used_drivers()
+ for vpci, (index, _) in self.used_drivers.items():
+ try:
+ intf1 = next(v for v in interfaces if vpci == v['virtual-interface']['vpci'])
+ except StopIteration:
+ pass
+ else:
+ intf1['dpdk_port_num'] = index
+ for vpci in self.bound_pci:
+ self._bind_dpdk('igb_uio', vpci)
+ time.sleep(2)
+ def _bind_dpdk(self, driver, vpci, force=True):
+ if force:
+ force = '--force '
+ else:
+ force = ''
+ cmd = self.DPDK_BIND_CMD.format(force=force,
+ dpdk_nic_bind=self.dpdk_nic_bind,
+ driver=driver,
+ vpci=vpci)
+ self.ssh_helper.execute(cmd)
+ def _detect_and_bind_dpdk(self, vpci, driver):
+ find_net_cmd = self.FIND_NET_CMD.format(vpci)
+ exit_status, _, _ = self.ssh_helper.execute(find_net_cmd)
+ if exit_status == 0:
+ # already bound
+ return None
+ self._bind_dpdk(driver, vpci)
+ exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
+ if exit_status != 0:
+ # failed to bind
+ return None
+ return stdout
+ def _bind_kernel_devices(self):
+ for intf in self.vnfd_helper.interfaces:
+ vi = intf["virtual-interface"]
+ stdout = self._detect_and_bind_dpdk(vi["vpci"], vi["driver"])
+ if stdout is not None:
+ vi["local_iface_name"] = posixpath.basename(stdout)
+ def tear_down(self):
+ for vpci, (_, driver) in self.used_drivers.items():
+ self.ssh_helper.execute(self.DPDK_UNBIND_CMD.format(dpdk_nic_bind=self.dpdk_nic_bind,
+ driver=driver,
+ vpci=vpci))
+class ResourceHelper(object):
+ MAKE_INSTALL = 'cd {0} && make && sudo make install'
+ RESOURCE_WORD = 'sample'
+ def __init__(self, setup_helper):
+ super(ResourceHelper, self).__init__()
+ self.resource = None
+ self.setup_helper = setup_helper
+ self.ssh_helper = setup_helper.ssh_helper
+ def setup(self):
+ self.resource = self.setup_helper.setup_vnf_environment()
+ def generate_cfg(self):
+ pass
+ def _collect_resource_kpi(self):
+ result = {}
+ status = self.resource.check_if_sa_running("collectd")[0]
+ if status:
+ result = self.resource.amqp_collect_nfvi_kpi()
+ result = {"core": result}
+ return result
+ def start_collect(self):
+ self.resource.initiate_systemagent(self.ssh_helper.bin_path)
+ self.resource.start()
+ self.resource.amqp_process_for_nfvi_kpi()
+ def stop_collect(self):
+ if self.resource:
+ self.resource.stop()
+ def collect_kpi(self):
+ return self._collect_resource_kpi()
+class ClientResourceHelper(ResourceHelper):
+ def __init__(self, setup_helper):
+ super(ClientResourceHelper, self).__init__(setup_helper)
+ self.vnfd_helper = setup_helper.vnfd_helper
+ self.scenario_helper = setup_helper.scenario_helper
+ self.client = None
+ self.client_started = Value('i', 0)
+ self.my_ports = None
+ self._queue = Queue()
+ self._result = {}
+ self._terminated = Value('i', 0)
+ self._vpci_ascending = None
+ def _build_ports(self):
+ self.my_ports = [0, 1]
+ def get_stats(self, *args, **kwargs):
+ try:
+ return self.client.get_stats(*args, **kwargs)
+ except STLStateError:
+ LOG.exception("TRex client not connected")
+ return {}
+ def generate_samples(self, key=None, default=None):
+ last_result = self.get_stats(self.my_ports)
+ key_value = last_result.get(key, default)
+ if not isinstance(last_result, Mapping): # added for mock unit test
+ self._terminated.value = 1
+ return {}
+ samples = {}
+ for vpci_idx, vpci in enumerate(self._vpci_ascending):
+ name = self.vnfd_helper.find_virtual_interface(vpci=vpci)["name"]
+ # fixme: VNFDs KPIs values needs to be mapped to TRex structure
+ xe_value = last_result.get(vpci_idx, {})
+ samples[name] = {
+ "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
+ "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
+ "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
+ "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
+ "in_packets": int(xe_value.get("ipackets", 0)),
+ "out_packets": int(xe_value.get("opackets", 0)),
+ }
+ if key:
+ samples[name][key] = key_value
+ return samples
+ def _run_traffic_once(self, traffic_profile):
+ traffic_profile.execute(self)
+ self.client_started.value = 1
+ time.sleep(self.RUN_DURATION)
+ samples = self.generate_samples()
+ time.sleep(self.QUEUE_WAIT_TIME)
+ self._queue.put(samples)
+ def run_traffic(self, traffic_profile):
+ # fixme: fix passing correct trex config file,
+ # instead of searching the default path
+ self._build_ports()
+ self.client = self._connect()
+ self.client.reset(ports=self.my_ports)
+ self.client.remove_all_streams(self.my_ports) # remove all streams
+ traffic_profile.register_generator(self)
+ while self._terminated.value == 0:
+ self._run_traffic_once(traffic_profile)
+ self.client.stop(self.my_ports)
+ self.client.disconnect()
+ self._terminated.value = 0
+ def terminate(self):
+ self._terminated.value = 1 # stop client
+ def clear_stats(self, ports=None):
+ if ports is None:
+ ports = self.my_ports
+ self.client.clear_stats(ports=ports)
+ def start(self, ports=None, *args, **kwargs):
+ if ports is None:
+ ports = self.my_ports
+ self.client.start(ports=ports, *args, **kwargs)
+ def collect_kpi(self):
+ if not self._queue.empty():
+ kpi = self._queue.get()
+ self._result.update(kpi)
+ LOG.debug("Collect {0} KPIs {1}".format(self.RESOURCE_WORD, self._result))
+ return self._result
+ def _connect(self, client=None):
+ if client is None:
+ client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
+ server=self.vnfd_helper.mgmt_interface["ip"],
+ verbose_level=LoggerApi.VERBOSE_QUIET)
+ # try to connect with 5s intervals, 30s max
+ for idx in range(6):
+ try:
+ client.connect()
+ break
+ except STLError:
+ LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
+ time.sleep(5)
+ return client
+class Rfc2544ResourceHelper(object):
+ DEFAULT_TOLERANCE = '0.0001 - 0.0001'
+ def __init__(self, scenario_helper):
+ super(Rfc2544ResourceHelper, self).__init__()
+ self.scenario_helper = scenario_helper
+ self._correlated_traffic = None
+ self.iteration = Value('i', 0)
+ self._latency = None
+ self._rfc2544 = None
+ self._tolerance_low = None
+ self._tolerance_high = None
+ @property
+ def rfc2544(self):
+ if self._rfc2544 is None:
+ self._rfc2544 = self.scenario_helper.all_options['rfc2544']
+ return self._rfc2544
+ @property
+ def tolerance_low(self):
+ if self._tolerance_low is None:
+ self.get_rfc_tolerance()
+ return self._tolerance_low
+ @property
+ def tolerance_high(self):
+ if self._tolerance_high is None:
+ self.get_rfc_tolerance()
+ return self._tolerance_high
+ @property
+ def correlated_traffic(self):
+ if self._correlated_traffic is None:
+ self._correlated_traffic = \
+ self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
+ return self._correlated_traffic
+ @property
+ def latency(self):
+ if self._latency is None:
+ self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
+ return self._latency
+ def get_rfc2544(self, name, default=None):
+ return self.rfc2544.get(name, default)
+ def get_rfc_tolerance(self):
+ tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
+ tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
+ self._tolerance_low = next(tolerance_iter)
+ self._tolerance_high = next(tolerance_iter, self.tolerance_low)
+class SampleVNFDeployHelper(object):
+ SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
+ REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
+ SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
+ def __init__(self, vnfd_helper, ssh_helper):
+ super(SampleVNFDeployHelper, self).__init__()
+ self.ssh_helper = ssh_helper
+ self.vnfd_helper = vnfd_helper
+ def deploy_vnfs(self, app_name):
+ # temp disable for now
+ return
+ vnf_bin = self.ssh_helper.join_bin_path(app_name)
+ exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
+ if not exit_status:
+ return
+ subprocess.check_output(["rm", "-rf", self.REPO_NAME])
+ subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
+ time.sleep(2)
+ self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
+ self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
+ build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
+ time.sleep(2)
+ http_proxy = os.environ.get('http_proxy', '')
+ https_proxy = os.environ.get('https_proxy', '')
+ cmd = "sudo -E %s --silent '%s' '%s'" % (build_script, http_proxy, https_proxy)
+ LOG.debug(cmd)
+ self.ssh_helper.execute(cmd)
+ vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
+ self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
+ self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
+class ScenarioHelper(object):
+ 'lb_config': 'SW',
+ 'lb_count': 1,
+ 'worker_config': '1C/1T',
+ 'worker_threads': 1,
+ }
+ def __init__(self, name):
+ self.name = name
+ self.scenario_cfg = None
+ @property
+ def task_path(self):
+ return self.scenario_cfg["task_path"]
+ @property
+ def nodes(self):
+ return self.scenario_cfg['nodes']
+ @property
+ def all_options(self):
+ return self.scenario_cfg["options"]
+ @property
+ def options(self):
+ return self.all_options[self.name]
+ @property
+ def vnf_cfg(self):
+ return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
+ @property
+ def topology(self):
+ return self.scenario_cfg['topology']
+class SampleVNF(GenericVNF):
+ """ Class providing file-like API for generic VNF implementation """
+ VNF_PROMPT = "pipeline>"
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ super(SampleVNF, self).__init__(name, vnfd)
+ self.bin_path = get_nsb_option('bin_path', '')
+ self.scenario_helper = ScenarioHelper(self.name)
+ self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
+ if setup_env_helper_type is None:
+ setup_env_helper_type = SetupEnvHelper
+ self.setup_helper = setup_env_helper_type(self.vnfd_helper,
+ self.ssh_helper,
+ self.scenario_helper)
+ self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
+ if resource_helper_type is None:
+ resource_helper_type = ResourceHelper
+ self.resource_helper = resource_helper_type(self.setup_helper)
+ self.all_ports = None
+ self.context_cfg = None
+ self.nfvi_context = None
+ self.pipeline_kwargs = {}
+ self.priv_ports = None
+ self.pub_ports = None
+ # TODO(esm): make QueueFileWrapper invert-able so that we
+ # never have to manage the queues
+ self.q_in = Queue()
+ self.q_out = Queue()
+ self.queue_wrapper = None
+ self.run_kwargs = {}
+ self.scenario_cfg = None
+ self.tg_port_pairs = None
+ self.used_drivers = {}
+ self.vnf_port_pairs = None
+ self._vnf_process = None
+ def _get_route_data(self, route_index, route_type):
+ route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
+ for _ in range(route_index):
+ next(route_iter, '')
+ return next(route_iter, {}).get(route_type, '')
+ def _get_port0localip6(self):
+ return_value = self._get_route_data(0, 'network')
+ LOG.info("_get_port0localip6 : %s", return_value)
+ return return_value
+ def _get_port1localip6(self):
+ return_value = self._get_route_data(1, 'network')
+ LOG.info("_get_port1localip6 : %s", return_value)
+ return return_value
+ def _get_port0prefixlen6(self):
+ return_value = self._get_route_data(0, 'netmask')
+ LOG.info("_get_port0prefixlen6 : %s", return_value)
+ return return_value
+ def _get_port1prefixlen6(self):
+ return_value = self._get_route_data(1, 'netmask')
+ LOG.info("_get_port1prefixlen6 : %s", return_value)
+ return return_value
+ def _get_port0gateway6(self):
+ return_value = self._get_route_data(0, 'network')
+ LOG.info("_get_port0gateway6 : %s", return_value)
+ return return_value
+ def _get_port1gateway6(self):
+ return_value = self._get_route_data(1, 'network')
+ LOG.info("_get_port1gateway6 : %s", return_value)
+ return return_value
+ def _start_vnf(self):
+ self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
+ self._vnf_process = Process(target=self._run)
+ self._vnf_process.start()
+ def _vnf_up_post(self):
+ pass
+ def instantiate(self, scenario_cfg, context_cfg):
+ self.scenario_helper.scenario_cfg = scenario_cfg
+ self.context_cfg = context_cfg
+ self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
+ # self.nfvi_context = None
+ self.deploy_helper.deploy_vnfs(self.APP_NAME)
+ self.resource_helper.setup()
+ self._start_vnf()
+ def wait_for_instantiate(self):
+ buf = []
+ time.sleep(self.WAIT_TIME) # Give some time for config to load
+ while True:
+ if not self._vnf_process.is_alive():
+ raise RuntimeError("%s VNF process died." % self.APP_NAME)
+ # TODO(esm): move to QueueFileWrapper
+ while self.q_out.qsize() > 0:
+ buf.append(self.q_out.get())
+ message = ''.join(buf)
+ if self.VNF_PROMPT in message:
+ LOG.info("%s VNF is up and running.", self.APP_NAME)
+ self._vnf_up_post()
+ self.queue_wrapper.clear()
+ self.resource_helper.start_collect()
+ return self._vnf_process.exitcode
+ if "PANIC" in message:
+ raise RuntimeError("Error starting %s VNF." %
+ self.APP_NAME)
+ LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
+ time.sleep(1)
+ def _build_run_kwargs(self):
+ self.run_kwargs = {
+ 'stdin': self.queue_wrapper,
+ 'stdout': self.queue_wrapper,
+ 'keep_stdin_open': True,
+ 'pty': True,
+ }
+ def _build_config(self):
+ return self.setup_helper.build_config()
+ def _run(self):
+ # we can't share ssh paramiko objects to force new connection
+ self.ssh_helper.drop_connection()
+ cmd = self._build_config()
+ # kill before starting
+ self.ssh_helper.execute("pkill {}".format(self.APP_NAME))
+ LOG.debug(cmd)
+ self._build_run_kwargs()
+ self.ssh_helper.run(cmd, **self.run_kwargs)
+ def vnf_execute(self, cmd, wait_time=2):
+ """ send cmd to vnf process """
+ LOG.info("%s command: %s", self.APP_NAME, cmd)
+ self.q_in.put("{}\r\n".format(cmd))
+ time.sleep(wait_time)
+ output = []
+ while self.q_out.qsize() > 0:
+ output.append(self.q_out.get())
+ return "".join(output)
+ def _tear_down(self):
+ pass
+ def terminate(self):
+ self.vnf_execute("quit")
+ if self._vnf_process:
+ self._vnf_process.terminate()
+ self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
+ self._tear_down()
+ self.resource_helper.stop_collect()
+ def get_stats(self, *args, **kwargs):
+ """
+ Method for checking the statistics
+ :return:
+ VNF statistics
+ """
+ cmd = 'p {0} stats'.format(self.APP_WORD)
+ out = self.vnf_execute(cmd)
+ return out
+ def collect_kpi(self):
+ stats = self.get_stats()
+ m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
+ if m:
+ result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
+ result["collect_stats"] = self.resource_helper.collect_kpi()
+ else:
+ result = {
+ "packets_in": 0,
+ "packets_fwd": 0,
+ "packets_dropped": 0,
+ }
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
+ return result
+class SampleVNFTrafficGen(GenericTrafficGen):
+ """ Class providing file-like API for generic traffic generator """
+ APP_NAME = 'Sample'
+ RUN_WAIT = 1
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ super(SampleVNFTrafficGen, self).__init__(name, vnfd)
+ self.bin_path = get_nsb_option('bin_path', '')
+ self.name = "tgen__1" # name in topology file
+ self.scenario_helper = ScenarioHelper(self.name)
+ self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
+ if setup_env_helper_type is None:
+ setup_env_helper_type = SetupEnvHelper
+ self.setup_helper = setup_env_helper_type(self.vnfd_helper,
+ self.ssh_helper,
+ self.scenario_helper)
+ if resource_helper_type is None:
+ resource_helper_type = ClientResourceHelper
+ self.resource_helper = resource_helper_type(self.setup_helper)
+ self.runs_traffic = True
+ self.traffic_finished = False
+ self.tg_port_pairs = None
+ self._tg_process = None
+ self._traffic_process = None
+ def _start_server(self):
+ # we can't share ssh paramiko objects to force new connection
+ self.ssh_helper.drop_connection()
+ def instantiate(self, scenario_cfg, context_cfg):
+ self.scenario_helper.scenario_cfg = scenario_cfg
+ self.resource_helper.generate_cfg()
+ self.setup_helper.setup_vnf_environment()
+ self.resource_helper.setup()
+ LOG.info("Starting %s server...", self.APP_NAME)
+ self._tg_process = Process(target=self._start_server)
+ self._tg_process.start()
+ def wait_for_instantiate(self):
+ return self._wait_for_process()
+ def _check_status(self):
+ raise NotImplementedError
+ def _wait_for_process(self):
+ while True:
+ if not self._tg_process.is_alive():
+ raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
+ LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
+ time.sleep(1)
+ status = self._check_status()
+ if status == 0:
+ LOG.info("%s TG Server is up and running.", self.APP_NAME)
+ return self._tg_process.exitcode
+ def _traffic_runner(self, traffic_profile):
+ LOG.info("Starting %s client...", self.APP_NAME)
+ self.resource_helper.run_traffic(traffic_profile)
+ def run_traffic(self, traffic_profile):
+ """ Generate traffic on the wire according to the given params.
+ Method is non-blocking, returns immediately when traffic process
+ is running. Mandatory.
+ :param traffic_profile:
+ :return: True/False
+ """
+ self._traffic_process = Process(target=self._traffic_runner,
+ args=(traffic_profile,))
+ self._traffic_process.start()
+ # Wait for traffic process to start
+ while self.resource_helper.client_started.value == 0:
+ time.sleep(self.RUN_WAIT)
+ return self._traffic_process.is_alive()
+ def listen_traffic(self, traffic_profile):
+ """ Listen to traffic with the given parameters.
+ Method is non-blocking, returns immediately when traffic process
+ is running. Optional.
+ :param traffic_profile:
+ :return: True/False
+ """
+ pass
+ def verify_traffic(self, traffic_profile):
+ """ Verify captured traffic after it has ended. Optional.
+ :param traffic_profile:
+ :return: dict
+ """
+ pass
+ def collect_kpi(self):
+ result = self.resource_helper.collect_kpi()
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
+ return result
+ def terminate(self):
+ """ After this method finishes, all traffic processes should stop. Mandatory.
+ :return: True/False
+ """
+ self.traffic_finished = True
+ if self._traffic_process is not None:
+ self._traffic_process.terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ping.py b/yardstick/network_services/vnf_generic/vnf/tg_ping.py
index 000a91db4..e65296287 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_ping.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_ping.py
@@ -16,14 +16,13 @@
from __future__ import absolute_import
from __future__ import print_function
import logging
-import multiprocessing
import re
-import time
-import os
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
-from yardstick.network_services.utils import provision_tool
+from multiprocessing import Queue
+from ipaddress import IPv4Interface
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
LOG = logging.getLogger(__name__)
@@ -42,77 +41,59 @@ class PingParser(object):
if match:
# IMPORTANT: in order for the data to be properly taken
# in by InfluxDB, it needs to be converted to numeric types
- self.queue.put({"packets_received": float(match.group(1)),
- "rtt": float(match.group(2))})
+ self.queue.put({
+ "packets_received": float(match.group(1)),
+ "rtt": float(match.group(2)),
+ })
def close(self):
- ''' close the ssh connection '''
- pass
+ """ close the ssh connection """
+ self.closed = True
def clear(self):
- ''' clear queue till Empty '''
+ """ clear queue till Empty """
while self.queue.qsize() > 0:
-class PingTrafficGen(GenericTrafficGen):
+class PingSetupEnvHelper(DpdkVnfSetupEnvHelper):
+ def setup_vnf_environment(self):
+ self._bind_kernel_devices()
+class PingTrafficGen(SampleVNFTrafficGen):
This traffic generator can ping a single IP with pingsize
and target given in traffic profile
- def __init__(self, vnfd):
- super(PingTrafficGen, self).__init__(vnfd)
+ TG_NAME = 'Ping'
+ RUN_WAIT = 4
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if setup_env_helper_type is None:
+ setup_env_helper_type = PingSetupEnvHelper
+ super(PingTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
+ self._queue = Queue()
+ self._parser = PingParser(self._queue)
self._result = {}
- self._parser = None
- self._queue = None
- self._traffic_process = None
- mgmt_interface = vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
- def _bind_device_kernel(self, connection):
- dpdk_nic_bind = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "dpdk_nic_bind.py"))
- drivers = {intf["virtual-interface"]["vpci"]:
- intf["virtual-interface"]["driver"]
- for intf in self.vnfd["vdu"][0]["external-interface"]}
- commands = \
- ['"{0}" --force -b "{1}" "{2}"'.format(dpdk_nic_bind, value, key)
- for key, value in drivers.items()]
- for command in commands:
- connection.execute(command)
- for index, out in enumerate(self.vnfd["vdu"][0]["external-interface"]):
- vpci = out["virtual-interface"]["vpci"]
- net = "find /sys/class/net -lname '*{}*' -printf '%f'".format(vpci)
- out = connection.execute(net)[1]
- ifname = out.split('/')[-1].strip('\n')
- self.vnfd["vdu"][0]["external-interface"][index][
- "virtual-interface"]["local_iface_name"] = ifname
def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(PingTrafficGen, self).scale(flavor)
+ """ scale vnf-based on flavor input """
+ pass
- def instantiate(self, scenario_cfg, context_cfg):
- self._result = {"packets_received": 0, "rtt": 0}
- self._bind_device_kernel(self.connection)
+ def _check_status(self):
+ return self._tg_process.is_alive()
- def run_traffic(self, traffic_profile):
- self._queue = multiprocessing.Queue()
- self._parser = PingParser(self._queue)
- self._traffic_process = \
- multiprocessing.Process(target=self._traffic_runner,
- args=(traffic_profile, self._parser))
- self._traffic_process.start()
- # Wait for traffic process to start
- time.sleep(4)
- return self._traffic_process.is_alive()
+ def instantiate(self, scenario_cfg, context_cfg):
+ self._result = {
+ "packets_received": 0,
+ "rtt": 0,
+ }
+ self.setup_helper.setup_vnf_environment()
def listen_traffic(self, traffic_profile):
""" Not needed for ping
@@ -122,38 +103,26 @@ class PingTrafficGen(GenericTrafficGen):
- def _traffic_runner(self, traffic_profile, filewrapper):
- mgmt_interface = self.vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
- external_interface = self.vnfd["vdu"][0]["external-interface"]
- virtual_interface = external_interface[0]["virtual-interface"]
- target_ip = virtual_interface["dst_ip"].split('/')[0]
- local_ip = virtual_interface["local_ip"].split('/')[0]
- local_if_name = \
- virtual_interface["local_iface_name"].split('/')[0]
- packet_size = traffic_profile.params["traffic_profile"]["frame_size"]
- run_cmd = []
- run_cmd.append("ip addr flush %s" % local_if_name)
- run_cmd.append("ip addr add %s/24 dev %s" % (local_ip, local_if_name))
- run_cmd.append("ip link set %s up" % local_if_name)
- for cmd in run_cmd:
- self.connection.execute(cmd)
- ping_cmd = ("ping -s %s %s" % (packet_size, target_ip))
- self.connection.run(ping_cmd, stdout=filewrapper,
+ def _traffic_runner(self, traffic_profile):
+ intf = self.vnfd_helper.interfaces[0]["virtual-interface"]
+ profile = traffic_profile.params["traffic_profile"]
+ cmd_kwargs = {
+ 'target_ip': IPv4Interface(intf["dst_ip"]).ip.exploded,
+ 'local_ip': IPv4Interface(intf["local_ip"]).ip.exploded,
+ 'local_if_name': intf["local_iface_name"].split('/')[0],
+ 'packet_size': profile["frame_size"],
+ }
+ cmd_list = [
+ "sudo ip addr flush {local_if_name}",
+ "sudo ip addr add {local_ip}/24 dev {local_if_name}",
+ "sudo ip link set {local_if_name} up",
+ ]
+ for cmd in cmd_list:
+ self.ssh_helper.execute(cmd.format(**cmd_kwargs))
+ ping_cmd = "ping -s {packet_size} {target_ip}"
+ self.ssh_helper.run(ping_cmd.format(**cmd_kwargs),
+ stdout=self._parser,
keep_stdin_open=True, pty=True)
- def collect_kpi(self):
- if not self._queue.empty():
- kpi = self._queue.get()
- self._result.update(kpi)
- return self._result
- def terminate(self):
- if self._traffic_process is not None:
- self._traffic_process.terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py
index 7da4b31e9..79e42e0a8 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py
@@ -15,267 +15,98 @@
from __future__ import absolute_import
from __future__ import print_function
-import multiprocessing
import time
import logging
-import os
-import yaml
+from collections import Mapping
+from itertools import chain
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
-from yardstick.network_services.utils import get_nsb_option
-from stl.trex_stl_lib.trex_stl_client import STLClient
-from stl.trex_stl_lib.trex_stl_client import LoggerApi
-from stl.trex_stl_lib.trex_stl_exceptions import STLError
+from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
+from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper
+from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexResourceHelper
LOGGING = logging.getLogger(__name__)
+class TrexRfc2544ResourceHelper(Rfc2544ResourceHelper):
-class TrexTrafficGenRFC(GenericTrafficGen):
- """
- This class handles mapping traffic profile and generating
- traffic for rfc2544 testcase.
- """
- def __init__(self, vnfd):
- super(TrexTrafficGenRFC, self).__init__(vnfd)
- self._result = {}
- self._terminated = multiprocessing.Value('i', 0)
- self._queue = multiprocessing.Queue()
- self._terminated = multiprocessing.Value('i', 0)
- self._traffic_process = None
- self._vpci_ascending = None
- self.tc_file_name = None
- self.client = None
- self.my_ports = None
- mgmt_interface = self.vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
- @classmethod
- def _split_mac_address_into_list(cls, mac):
- octets = mac.split(':')
- for i, elem in enumerate(octets):
- octets[i] = "0x" + str(elem)
- return octets
- def _generate_trex_cfg(self, vnfd):
- """
- :param vnfd: vnfd.yaml
- :return: trex_cfg.yaml file
- """
- trex_cfg = dict(
- port_limit=0,
- version='2',
- interfaces=[],
- port_info=list(dict(
- ))
- )
- trex_cfg["port_limit"] = len(vnfd["vdu"][0]["external-interface"])
- trex_cfg["version"] = '2'
- cfg_file = []
- vpci = []
- port = {}
- ext_intf = vnfd["vdu"][0]["external-interface"]
- for interface in ext_intf:
- virt_intf = interface["virtual-interface"]
- vpci.append(virt_intf["vpci"])
- port["src_mac"] = \
- self._split_mac_address_into_list(virt_intf["local_mac"])
- time.sleep(WAIT_TIME)
- port["dest_mac"] = \
- self._split_mac_address_into_list(virt_intf["dst_mac"])
- if virt_intf["dst_mac"]:
- trex_cfg["port_info"].append(port.copy())
- trex_cfg["interfaces"] = vpci
- cfg_file.append(trex_cfg)
- with open('/tmp/trex_cfg.yaml', 'w') as outfile:
- outfile.write(yaml.safe_dump(cfg_file, default_flow_style=False))
- self.connection.put('/tmp/trex_cfg.yaml', '/etc')
- self._vpci_ascending = sorted(vpci)
- def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(TrexTrafficGenRFC, self).scale(flavor)
- def instantiate(self, scenario_cfg, context_cfg):
- self._generate_trex_cfg(self.vnfd)
- self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc'])
- trex = os.path.join(self.bin_path, "trex")
- err, _, _ = \
- self.connection.execute("ls {} >/dev/null 2>&1".format(trex))
- if err != 0:
- self.connection.put(trex, trex, True)
+ def is_done(self):
+ return self.latency and self.iteration.value > 10
- LOGGING.debug("Starting TRex server...")
- _tg_server = \
- multiprocessing.Process(target=self._start_server)
- _tg_server.start()
- while True:
- LOGGING.info("Waiting for TG Server to start.. ")
- time.sleep(WAIT_TIME)
- status = \
- self.connection.execute("lsof -i:%s" % TREX_SYNC_PORT)[0]
- if status == 0:
- LOGGING.info("TG server is up and running.")
- return _tg_server.exitcode
- if not _tg_server.is_alive():
- raise RuntimeError("Traffic Generator process died.")
+class TrexRfcResourceHelper(TrexResourceHelper):
- def listen_traffic(self, traffic_profile):
- pass
- def _get_logical_if_name(self, vpci):
- ext_intf = self.vnfd["vdu"][0]["external-interface"]
- for interface in range(len(self.vnfd["vdu"][0]["external-interface"])):
- virtual_intf = ext_intf[interface]["virtual-interface"]
- if virtual_intf["vpci"] == vpci:
- return ext_intf[interface]["name"]
+ def __init__(self, setup_helper, rfc_helper_type=None):
+ super(TrexRfcResourceHelper, self).__init__(setup_helper)
- def run_traffic(self, traffic_profile,
- client_started=multiprocessing.Value('i', 0)):
+ if rfc_helper_type is None:
+ rfc_helper_type = TrexRfc2544ResourceHelper
- self._traffic_process = \
- multiprocessing.Process(target=self._traffic_runner,
- args=(traffic_profile, self._queue,
- client_started, self._terminated))
- self._traffic_process.start()
- # Wait for traffic process to start
- while client_started.value == 0:
- time.sleep(1)
+ self.rfc2544_helper = rfc_helper_type(self.scenario_helper)
+ # self.tg_port_pairs = []
- return self._traffic_process.is_alive()
+ def _build_ports(self):
+ self.tg_port_pairs, self.networks = MultiPortConfig.get_port_pairs(
+ self.vnfd_helper.interfaces)
+ self.priv_ports = [int(x[0][-1]) for x in self.tg_port_pairs]
+ self.pub_ports = [int(x[1][-1]) for x in self.tg_port_pairs]
+ self.my_ports = list(set(chain(self.priv_ports, self.pub_ports)))
- def _start_server(self):
- mgmt_interface = self.vnfd["mgmt-interface"]
- _server = ssh.SSH.from_node(mgmt_interface)
- _server.wait()
- _server.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- _server.execute("pkill -9 rex > /dev/null 2>&1")
- trex_path = os.path.join(self.bin_path, "trex/scripts")
- path = get_nsb_option("trex_path", trex_path)
- trex_cmd = "cd " + path + "; sudo ./t-rex-64 -i > /dev/null 2>&1"
- _server.execute(trex_cmd)
- def _connect_client(self, client=None):
- if client is None:
- client = STLClient(username=self.vnfd["mgmt-interface"]["user"],
- server=self.vnfd["mgmt-interface"]["ip"],
- verbose_level=LoggerApi.VERBOSE_QUIET)
- for idx in range(6):
- try:
- client.connect()
- break
- except STLError:
- LOGGING.info("Unable to connect to Trex. Attempt %s", idx)
- time.sleep(WAIT_TIME)
- return client
- @classmethod
- def _get_rfc_tolerance(cls, tc_yaml):
- tolerance = '0.8 - 1.0'
- if 'tc_options' in tc_yaml['scenarios'][0]:
- tc_options = tc_yaml['scenarios'][0]['tc_options']
- if 'rfc2544' in tc_options:
- tolerance = \
- tc_options['rfc2544'].get('allowed_drop_rate', '0.8 - 1.0')
- tolerance = tolerance.split('-')
- min_tol = float(tolerance[0])
- if len(tolerance) == 2:
- max_tol = float(tolerance[1])
- else:
- max_tol = float(tolerance[0])
- return [min_tol, max_tol]
- def _traffic_runner(self, traffic_profile, queue,
- client_started, terminated):
- LOGGING.info("Starting TRex client...")
- tc_yaml = {}
- with open(self.tc_file_name) as tc_file:
- tc_yaml = yaml.load(tc_file.read())
+ def _run_traffic_once(self, traffic_profile):
+ traffic_profile.execute(self)
+ self.client_started.value = 1
+ time.sleep(self.RUN_DURATION)
+ self.client.stop(self.my_ports)
+ time.sleep(self.WAIT_TIME)
+ samples = traffic_profile.get_drop_percentage(self)
+ self._queue.put(samples)
- tolerance = self._get_rfc_tolerance(tc_yaml)
+ if not self.rfc2544_helper.is_done():
+ return
- # fixme: fix passing correct trex config file,
- # instead of searching the default path
- self.my_ports = [0, 1]
- self.client = self._connect_client()
+ self.client.stop(self.my_ports)
- self.client.remove_all_streams(self.my_ports) # remove all streams
- while not terminated.value:
- traffic_profile.execute(self)
- client_started.value = 1
- time.sleep(DURATION)
+ self.client.remove_all_streams(self.my_ports)
+ traffic_profile.execute_latency(samples=samples)
+ multiplier = traffic_profile.calculate_pps(samples)[1]
+ for _ in range(5):
+ time.sleep(self.LATENCY_TIME_SLEEP)
- time.sleep(WAIT_TIME)
+ time.sleep(self.WAIT_TIME)
last_res = self.client.get_stats(self.my_ports)
- samples = {}
- for vpci_idx in range(len(self._vpci_ascending)):
- name = \
- self._get_logical_if_name(self._vpci_ascending[vpci_idx])
- # fixme: VNFDs KPIs values needs to be mapped to TRex structure
- if not isinstance(last_res, dict):
- terminated.value = 1
- last_res = {}
+ if not isinstance(last_res, Mapping):
+ self._terminated.value = 1
+ continue
+ self.generate_samples('latency', {})
+ self._queue.put(samples)
+ self.client.start(mult=str(multiplier),
+ ports=self.my_ports,
+ duration=120, force=True)
- samples[name] = \
- {"rx_throughput_fps":
- float(last_res.get(vpci_idx, {}).get("rx_pps", 0.0)),
- "tx_throughput_fps":
- float(last_res.get(vpci_idx, {}).get("tx_pps", 0.0)),
- "rx_throughput_mbps":
- float(last_res.get(vpci_idx, {}).get("rx_bps", 0.0)),
- "tx_throughput_mbps":
- float(last_res.get(vpci_idx, {}).get("tx_bps", 0.0)),
- "in_packets":
- last_res.get(vpci_idx, {}).get("ipackets", 0),
- "out_packets":
- last_res.get(vpci_idx, {}).get("opackets", 0)}
+ def start_client(self, mult, duration, force=True):
+ self.client.start(ports=self.my_ports, mult=mult, duration=duration, force=force)
- samples = \
- traffic_profile.get_drop_percentage(self, samples,
- tolerance[0], tolerance[1])
- queue.put(samples)
- self.client.stop(self.my_ports)
- self.client.disconnect()
- queue.put(samples)
+ def clear_client_stats(self):
+ self.client.clear_stats(ports=self.my_ports)
def collect_kpi(self):
- if not self._queue.empty():
- result = self._queue.get()
- self._result.update(result)
- LOGGING.debug("trex collect Kpis %s", self._result)
- return self._result
+ self.rfc2544_helper.iteration.value += 1
+ super(TrexRfcResourceHelper, self).collect_kpi()
- def terminate(self):
- self._terminated.value = 1 # stop Trex clinet
+class TrexTrafficGenRFC(TrexTrafficGen):
+ """
+ This class handles mapping traffic profile and generating
+ traffic for rfc2544 testcase.
+ """
- self.connection.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if resource_helper_type is None:
+ resource_helper_type = TrexRfcResourceHelper
- if self._traffic_process:
- self._traffic_process.terminate()
+ super(TrexTrafficGenRFC, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_trex.py
index 058b715fe..616b331ba 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_trex.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_trex.py
@@ -15,261 +15,136 @@
from __future__ import absolute_import
from __future__ import print_function
-import multiprocessing
-import time
import logging
import os
import yaml
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
+from yardstick.common.utils import mac_address_to_hex_list
from yardstick.network_services.utils import get_nsb_option
-from yardstick.network_services.utils import provision_tool
-from stl.trex_stl_lib.trex_stl_client import STLClient
-from stl.trex_stl_lib.trex_stl_client import LoggerApi
-from stl.trex_stl_lib.trex_stl_exceptions import STLError
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
LOG = logging.getLogger(__name__)
-class TrexTrafficGen(GenericTrafficGen):
+class TrexResourceHelper(ClientResourceHelper):
+ CONF_FILE = '/tmp/trex_cfg.yaml'
+ RESOURCE_WORD = 'trex'
+ SYNC_PORT = 4500
+ ASYNC_PORT = 4501
+ def generate_cfg(self):
+ ext_intf = self.vnfd_helper.interfaces
+ vpci_list = []
+ port_list = []
+ trex_cfg = {
+ 'port_limit': 0,
+ 'version': '2',
+ 'interfaces': vpci_list,
+ 'port_info': port_list,
+ "port_limit": len(ext_intf),
+ "version": '2',
+ }
+ cfg_file = [trex_cfg]
+ for interface in ext_intf:
+ virtual_interface = interface['virtual-interface']
+ vpci_list.append(virtual_interface["vpci"])
+ dst_mac = virtual_interface["dst_mac"]
+ if not dst_mac:
+ continue
+ local_mac = virtual_interface["local_mac"]
+ port_list.append({
+ "src_mac": mac_address_to_hex_list(local_mac),
+ "dest_mac": mac_address_to_hex_list(dst_mac),
+ })
+ cfg_str = yaml.safe_dump(cfg_file, default_flow_style=False, explicit_start=True)
+ self.ssh_helper.upload_config_file(os.path.basename(self.CONF_FILE), cfg_str)
+ self._vpci_ascending = sorted(vpci_list)
+ def check_status(self):
+ status, _, _ = self.ssh_helper.execute("sudo lsof -i:%s" % self.SYNC_PORT)
+ return status
+ # temp disable
+ def setup(self):
+ return
+ trex_path = self.ssh_helper.join_bin_path('trex')
+ err = self.ssh_helper.execute("which {}".format(trex_path))[0]
+ if err == 0:
+ return
+ LOG.info("Copying %s to destination...", self.RESOURCE_WORD)
+ self.ssh_helper.run("sudo mkdir -p '{}'".format(os.path.dirname(trex_path)))
+ self.ssh_helper.put("~/.bash_profile", "~/.bash_profile")
+ self.ssh_helper.put(trex_path, trex_path, True)
+ ko_src = os.path.join(trex_path, "scripts/ko/src/")
+ self.ssh_helper.execute(self.MAKE_INSTALL.format(ko_src))
+ def start(self, ports=None, *args, **kwargs):
+ cmd = "sudo fuser -n tcp {0.SYNC_PORT} {0.ASYNC_PORT} -k > /dev/null 2>&1"
+ self.ssh_helper.execute(cmd.format(self))
+ self.ssh_helper.execute("sudo pkill -9 rex > /dev/null 2>&1")
+ trex_path = self.ssh_helper.join_bin_path("trex", "scripts")
+ path = get_nsb_option("trex_path", trex_path)
+ # cmd = "sudo ./t-rex-64 -i --cfg %s > /dev/null 2>&1" % self.CONF_FILE
+ cmd = "./t-rex-64 -i --cfg '{}'".format(self.CONF_FILE)
+ # if there are errors we want to see them
+ # we have to sudo cd because the path might be owned by root
+ trex_cmd = """sudo bash -c "cd '{}' ; {}" >/dev/null""".format(path, cmd)
+ self.ssh_helper.execute(trex_cmd)
+ def terminate(self):
+ super(TrexResourceHelper, self).terminate()
+ cmd = "sudo fuser -n tcp %s %s -k > /dev/null 2>&1"
+ self.ssh_helper.execute(cmd % (self.SYNC_PORT, self.ASYNC_PORT))
+class TrexTrafficGen(SampleVNFTrafficGen):
This class handles mapping traffic profile and generating
traffic for given testcase
- def __init__(self, vnfd):
- super(TrexTrafficGen, self).__init__(vnfd)
- self._result = {}
- self._queue = multiprocessing.Queue()
- self._terminated = multiprocessing.Value('i', 0)
- self._traffic_process = None
- self._vpci_ascending = None
- self.client = None
- self.my_ports = None
- self.client_started = multiprocessing.Value('i', 0)
- mgmt_interface = vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
- @classmethod
- def _split_mac_address_into_list(cls, mac):
- octets = mac.split(':')
- for i, elem in enumerate(octets):
- octets[i] = "0x" + str(elem)
- return octets
- def _generate_trex_cfg(self, vnfd):
- """
- :param vnfd: vnfd.yaml
- :return: trex_cfg.yaml file
- """
- trex_cfg = dict(
- port_limit=0,
- version='2',
- interfaces=[],
- port_info=list(dict(
- ))
- )
- trex_cfg["port_limit"] = len(vnfd["vdu"][0]["external-interface"])
- trex_cfg["version"] = '2'
- cfg_file = []
- vpci = []
- port = {}
- for interface in range(len(vnfd["vdu"][0]["external-interface"])):
- ext_intrf = vnfd["vdu"][0]["external-interface"]
- virtual_interface = ext_intrf[interface]["virtual-interface"]
- vpci.append(virtual_interface["vpci"])
- port["src_mac"] = self._split_mac_address_into_list(
- virtual_interface["local_mac"])
- port["dest_mac"] = self._split_mac_address_into_list(
- virtual_interface["dst_mac"])
- trex_cfg["port_info"].append(port.copy())
- trex_cfg["interfaces"] = vpci
- cfg_file.append(trex_cfg)
- with open('/tmp/trex_cfg.yaml', 'w') as outfile:
- outfile.write(yaml.safe_dump(cfg_file, default_flow_style=False))
- self.connection.put('/tmp/trex_cfg.yaml', '/etc')
- self._vpci_ascending = sorted(vpci)
- @classmethod
- def __setup_hugepages(cls, connection):
- hugepages = \
- connection.execute(
- "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1]
- hugepages = hugepages.rstrip()
- memory_path = \
- '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
- connection.execute("awk -F: '{ print $1 }' < %s" % memory_path)
- pages = 16384 if hugepages.rstrip() == "2048kB" else 16
- connection.execute("echo %s > %s" % (pages, memory_path))
- def setup_vnf_environment(self, connection):
- ''' setup dpdk environment needed for vnf to run '''
- self.__setup_hugepages(connection)
- connection.execute("modprobe uio && modprobe igb_uio")
- exit_status = connection.execute("lsmod | grep -i igb_uio")[0]
- if exit_status == 0:
- return
+ APP_NAME = 'TRex'
- dpdk = os.path.join(self.bin_path, "dpdk-16.07")
- dpdk_setup = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "nsb_setup.sh"))
- status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0]
- if status:
- connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if resource_helper_type is None:
+ resource_helper_type = TrexResourceHelper
- def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(TrexTrafficGen, self).scale(flavor)
- def instantiate(self, scenario_cfg, context_cfg):
- self._generate_trex_cfg(self.vnfd)
- self.setup_vnf_environment(self.connection)
- trex = os.path.join(self.bin_path, "trex")
- err = \
- self.connection.execute("ls {} >/dev/null 2>&1".format(trex))[0]
- if err != 0:
- LOG.info("Copying trex to destination...")
- self.connection.put("/root/.bash_profile", "/root/.bash_profile")
- self.connection.put(trex, trex, True)
- ko_src = os.path.join(trex, "scripts/ko/src/")
- self.connection.execute("cd %s && make && make install" % ko_src)
- LOG.info("Starting TRex server...")
- _tg_process = \
- multiprocessing.Process(target=self._start_server)
- _tg_process.start()
- while True:
- if not _tg_process.is_alive():
- raise RuntimeError("Traffic Generator process died.")
- LOG.info("Waiting for TG Server to start.. ")
- time.sleep(1)
- status = \
- self.connection.execute("lsof -i:%s" % TREX_SYNC_PORT)[0]
- if status == 0:
- LOG.info("TG server is up and running.")
- return _tg_process.exitcode
- def listen_traffic(self, traffic_profile):
- pass
+ super(TrexTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
- def _get_logical_if_name(self, vpci):
- ext_intf = self.vnfd["vdu"][0]["external-interface"]
- for interface in range(len(self.vnfd["vdu"][0]["external-interface"])):
- virtual_intf = ext_intf[interface]["virtual-interface"]
- if virtual_intf["vpci"] == vpci:
- return ext_intf[interface]["name"]
- def run_traffic(self, traffic_profile):
- self._traffic_process = \
- multiprocessing.Process(target=self._traffic_runner,
- args=(traffic_profile, self._queue,
- self.client_started,
- self._terminated))
- self._traffic_process.start()
- # Wait for traffic process to start
- while self.client_started.value == 0:
- time.sleep(1)
- return self._traffic_process.is_alive()
+ def _check_status(self):
+ return self.resource_helper.check_status()
def _start_server(self):
- mgmt_interface = self.vnfd["mgmt-interface"]
- _server = ssh.SSH.from_node(mgmt_interface)
- _server.wait()
+ super(TrexTrafficGen, self)._start_server()
+ self.resource_helper.start()
- _server.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
+ def scale(self, flavor=""):
+ pass
- trex_path = os.path.join(self.bin_path, "trex/scripts")
- path = get_nsb_option("trex_path", trex_path)
- trex_cmd = "cd " + path + "; sudo ./t-rex-64 -i > /dev/null 2>&1"
- _server.execute(trex_cmd)
- def _connect_client(self, client=None):
- if client is None:
- client = STLClient(username=self.vnfd["mgmt-interface"]["user"],
- server=self.vnfd["mgmt-interface"]["ip"],
- verbose_level=LoggerApi.VERBOSE_QUIET)
- # try to connect with 5s intervals, 30s max
- for idx in range(6):
- try:
- client.connect()
- break
- except STLError:
- LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
- time.sleep(5)
- return client
- def _traffic_runner(self, traffic_profile, queue,
- client_started, terminated):
- LOG.info("Starting TRex client...")
- self.my_ports = [0, 1]
- self.client = self._connect_client()
- self.client.reset(ports=self.my_ports)
- self.client.remove_all_streams(self.my_ports) # remove all streams
- while not terminated.value:
- traffic_profile.execute(self)
- client_started.value = 1
- last_res = self.client.get_stats(self.my_ports)
- if not isinstance(last_res, dict): # added for mock unit test
- terminated.value = 1
- last_res = {}
- samples = {}
- for vpci_idx in range(len(self._vpci_ascending)):
- name = \
- self._get_logical_if_name(self._vpci_ascending[vpci_idx])
- # fixme: VNFDs KPIs values needs to be mapped to TRex structure
- xe_value = last_res.get(vpci_idx, {})
- samples[name] = \
- {"rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
- "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
- "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
- "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
- "in_packets": xe_value.get("ipackets", 0),
- "out_packets": xe_value.get("opackets", 0)}
- time.sleep(WAIT_QUEUE)
- queue.put(samples)
- self.client.disconnect()
- terminated.value = 0
- def collect_kpi(self):
- if not self._queue.empty():
- self._result.update(self._queue.get())
- LOG.debug("trex collect Kpis %s", self._result)
- return self._result
+ def listen_traffic(self, traffic_profile):
+ pass
def terminate(self):
- self.connection.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- self.traffic_finished = True
- if self._traffic_process:
- self._traffic_process.terminate()
+ self.resource_helper.terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
index e9e80bdfb..310ab67cb 100644
--- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
+++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
@@ -15,313 +15,268 @@
from __future__ import absolute_import
from __future__ import print_function
-import tempfile
-import time
import os
import logging
import re
-from multiprocessing import Queue
-import multiprocessing
-import ipaddress
-import six
+import posixpath
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
-from yardstick.network_services.utils import provision_tool
-from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
-from yardstick.network_services.nfvi.resource import ResourceProfile
+from six.moves import configparser, zip
-LOG = logging.getLogger(__name__)
-VPE_PIPELINE_COMMAND = '{tool_path} -p 0x3 -f {cfg_file} -s {script}'
-CORES = ['0', '1', '2']
+from yardstick.network_services.pipeline import PipelineRules
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper
+LOG = logging.getLogger(__name__)
-class VpeApproxVnf(GenericVNF):
+VPE_PIPELINE_COMMAND = """sudo {tool_path} -p {ports_len_hex} -f {cfg_file} -s {script}"""
+Pkts in:\s(\d+)\r\n\
+\tPkts dropped by Pkts in:\s(\d+)\r\n\
+\tPkts dropped by AH:\s(\d+)\r\n\\
+\tPkts dropped by other:\s(\d+)\
+class ConfigCreate(object):
+ @staticmethod
+ def vpe_tmq(config, index):
+ tm_q = 'TM{0}'.format(index)
+ config.add_section(tm_q)
+ config.set(tm_q, 'burst_read', '24')
+ config.set(tm_q, 'burst_write', '32')
+ config.set(tm_q, 'cfg', '/tmp/full_tm_profile_10G.cfg')
+ return config
+ def __init__(self, priv_ports, pub_ports, socket):
+ super(ConfigCreate, self).__init__()
+ self.sw_q = -1
+ self.sink_q = -1
+ self.n_pipeline = 1
+ self.priv_ports = priv_ports
+ self.pub_ports = pub_ports
+ self.pipeline_per_port = 9
+ self.socket = socket
+ def vpe_initialize(self, config):
+ config.add_section('EAL')
+ config.set('EAL', 'log_level', '0')
+ config.add_section('PIPELINE0')
+ config.set('PIPELINE0', 'type', 'MASTER')
+ config.set('PIPELINE0', 'core', 's%sC0' % self.socket)
+ config.add_section('MEMPOOL0')
+ config.set('MEMPOOL0', 'pool_size', '256K')
+ config.add_section('MEMPOOL1')
+ config.set('MEMPOOL1', 'pool_size', '2M')
+ return config
+ def vpe_rxq(self, config):
+ for port in self.pub_ports:
+ new_section = 'RXQ{0}.0'.format(port)
+ config.add_section(new_section)
+ config.set(new_section, 'mempool', 'MEMPOOL1')
+ return config
+ def get_sink_swq(self, parser, pipeline, k, index):
+ sink = ""
+ pktq = parser.get(pipeline, k)
+ if "SINK" in pktq:
+ self.sink_q += 1
+ sink = " SINK{0}".format(self.sink_q)
+ if "TM" in pktq:
+ sink = " TM{0}".format(index)
+ pktq = "SWQ{0}{1}".format(self.sw_q, sink)
+ return pktq
+ def vpe_upstream(self, vnf_cfg, intf):
+ parser = configparser.ConfigParser()
+ parser.read(os.path.join(vnf_cfg, 'vpe_upstream'))
+ for pipeline in parser.sections():
+ for k, v in parser.items(pipeline):
+ if k == "pktq_in":
+ index = intf['index']
+ if "RXQ" in v:
+ value = "RXQ{0}.0".format(index)
+ else:
+ value = self.get_sink_swq(parser, pipeline, k, index)
+ parser.set(pipeline, k, value)
+ elif k == "pktq_out":
+ index = intf['peer_intf']['index']
+ if "TXQ" in v:
+ value = "TXQ{0}.0".format(index)
+ else:
+ self.sw_q += 1
+ value = self.get_sink_swq(parser, pipeline, k, index)
+ parser.set(pipeline, k, value)
+ new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline)
+ if new_pipeline != pipeline:
+ parser._sections[new_pipeline] = parser._sections[pipeline]
+ parser._sections.pop(pipeline)
+ self.n_pipeline += 1
+ return parser
+ def vpe_downstream(self, vnf_cfg, intf):
+ parser = configparser.ConfigParser()
+ parser.read(os.path.join(vnf_cfg, 'vpe_downstream'))
+ for pipeline in parser.sections():
+ for k, v in parser.items(pipeline):
+ index = intf['dpdk_port_num']
+ peer_index = intf['peer_intf']['dpdk_port_num']
+ if k == "pktq_in":
+ if "RXQ" not in v:
+ value = self.get_sink_swq(parser, pipeline, k, index)
+ elif "TM" in v:
+ value = "RXQ{0}.0 TM{1}".format(peer_index, index)
+ else:
+ value = "RXQ{0}.0".format(peer_index)
+ parser.set(pipeline, k, value)
+ if k == "pktq_out":
+ if "TXQ" not in v:
+ self.sw_q += 1
+ value = self.get_sink_swq(parser, pipeline, k, index)
+ elif "TM" in v:
+ value = "TXQ{0}.0 TM{1}".format(peer_index, index)
+ else:
+ value = "TXQ{0}.0".format(peer_index)
+ parser.set(pipeline, k, value)
+ new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline)
+ if new_pipeline != pipeline:
+ parser._sections[new_pipeline] = parser._sections[pipeline]
+ parser._sections.pop(pipeline)
+ self.n_pipeline += 1
+ return parser
+ def create_vpe_config(self, vnf_cfg):
+ config = configparser.ConfigParser()
+ vpe_cfg = os.path.join("/tmp/vpe_config")
+ with open(vpe_cfg, 'w') as cfg_file:
+ config = self.vpe_initialize(config)
+ config = self.vpe_rxq(config)
+ config.write(cfg_file)
+ for index, priv_port in enumerate(self.priv_ports):
+ config = self.vpe_upstream(vnf_cfg, priv_port)
+ config.write(cfg_file)
+ config = self.vpe_downstream(vnf_cfg, priv_port)
+ config = self.vpe_tmq(config, index)
+ config.write(cfg_file)
+ def generate_vpe_script(self, interfaces):
+ rules = PipelineRules(pipeline_id=1)
+ for priv_port, pub_port in zip(self.priv_ports, self.pub_ports):
+ priv_intf = interfaces[priv_port]["virtual-interface"]
+ pub_intf = interfaces[pub_port]["virtual-interface"]
+ dst_port0_ip = priv_intf["dst_ip"]
+ dst_port1_ip = pub_intf["dst_ip"]
+ dst_port0_mac = priv_intf["dst_mac"]
+ dst_port1_mac = pub_intf["dst_mac"]
+ rules.add_firewall_script(dst_port0_ip)
+ rules.next_pipeline()
+ rules.add_flow_classification_script()
+ rules.next_pipeline()
+ rules.add_flow_action()
+ rules.next_pipeline()
+ rules.add_flow_action2()
+ rules.next_pipeline()
+ rules.add_route_script(dst_port1_ip, dst_port1_mac)
+ rules.next_pipeline()
+ rules.add_route_script2(dst_port0_ip, dst_port0_mac)
+ rules.next_pipeline(num=4)
+ return rules.get_string()
+class VpeApproxSetupEnvHelper(DpdkVnfSetupEnvHelper):
+ CFG_CONFIG = "/tmp/vpe_config"
+ CFG_SCRIPT = "/tmp/vpe_script"
+ CORES = ['0', '1', '2', '3', '4', '5']
+ def build_config(self):
+ vpe_vars = {
+ "bin_path": self.ssh_helper.bin_path,
+ "socket": self.socket,
+ }
+ all_ports = []
+ priv_ports = []
+ pub_ports = []
+ for interface in self.vnfd_helper.interfaces:
+ all_ports.append(interface['name'])
+ vld_id = interface['virtual-interface']['vld_id']
+ if vld_id.startswith('private'):
+ priv_ports.append(interface)
+ elif vld_id.startswith('public'):
+ pub_ports.append(interface)
+ vpe_conf = ConfigCreate(priv_ports, pub_ports, self.socket)
+ vpe_conf.create_vpe_config(self.scenario_helper.vnf_cfg)
+ config_basename = posixpath.basename(self.CFG_CONFIG)
+ script_basename = posixpath.basename(self.CFG_SCRIPT)
+ with open(self.CFG_CONFIG) as handle:
+ vpe_config = handle.read()
+ self.ssh_helper.upload_config_file(config_basename, vpe_config.format(**vpe_vars))
+ vpe_script = vpe_conf.generate_vpe_script(self.vnfd_helper.interfaces)
+ self.ssh_helper.upload_config_file(script_basename, vpe_script.format(**vpe_vars))
+class VpeApproxVnf(SampleVNF):
""" This class handles vPE VNF model-driver definitions """
- def __init__(self, vnfd):
- super(VpeApproxVnf, self).__init__(vnfd)
- self.socket = None
- self.q_in = Queue()
- self.q_out = Queue()
- self.vnf_cfg = None
- self._vnf_process = None
- self.connection = None
- self.resource = None
- def _resource_collect_start(self):
- self.resource.initiate_systemagent(self.bin_path)
- self.resource.start()
+ APP_NAME = 'vPE_vnf'
+ APP_WORD = 'vpe'
+ WAIT_TIME = 20
- def _resource_collect_stop(self):
- self.resource.stop()
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if setup_env_helper_type is None:
+ setup_env_helper_type = VpeApproxSetupEnvHelper
- def _collect_resource_kpi(self):
- result = {}
+ super(VpeApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type)
- status = self.resource.check_if_sa_running("collectd")[0]
- if status:
- result = self.resource.amqp_collect_nfvi_kpi()
- result = {"core": result}
- return result
- @classmethod
- def __setup_hugepages(cls, connection):
- hugepages = \
- connection.execute(
- "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1]
- hugepages = hugepages.rstrip()
- memory_path = \
- '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
- connection.execute("awk -F: '{ print $1 }' < %s" % memory_path)
- pages = 16384 if hugepages.rstrip() == "2048kB" else 16
- connection.execute("echo %s > %s" % (pages, memory_path))
- def setup_vnf_environment(self, connection):
- ''' setup dpdk environment needed for vnf to run '''
- self.__setup_hugepages(connection)
- connection.execute("modprobe uio && modprobe igb_uio")
- exit_status = connection.execute("lsmod | grep -i igb_uio")[0]
- if exit_status == 0:
- return
- dpdk = os.path.join(self.bin_path, "dpdk-16.07")
- dpdk_setup = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "nsb_setup.sh"))
- status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0]
- if status:
- connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
- def _get_cpu_sibling_list(self):
- cpu_topo = []
- for core in CORES:
- sys_cmd = \
- "/sys/devices/system/cpu/cpu%s/topology/thread_siblings_list" \
- % core
- cpuid = \
- self.connection.execute("awk -F: '{ print $1 }' < %s" %
- sys_cmd)[1]
- cpu_topo += \
- [(idx) if idx.isdigit() else idx for idx in cpuid.split(',')]
- return [cpu.strip() for cpu in cpu_topo]
- def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(VpeApproxVnf, self).scale(flavor)
- def instantiate(self, scenario_cfg, context_cfg):
- vnf_cfg = scenario_cfg['vnf_options']['vpe']['cfg']
- mgmt_interface = self.vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc'])
- self.setup_vnf_environment(self.connection)
- cores = self._get_cpu_sibling_list()
- self.resource = ResourceProfile(self.vnfd, cores)
- self.connection.execute("pkill vPE_vnf")
- dpdk_nic_bind = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "dpdk_nic_bind.py"))
- interfaces = self.vnfd["vdu"][0]['external-interface']
- self.socket = \
- next((0 for v in interfaces
- if v['virtual-interface']["vpci"][5] == "0"), 1)
- bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
- for vpci in bound_pci:
- self.connection.execute(
- "%s --force -b igb_uio %s" % (dpdk_nic_bind, vpci))
- queue_wrapper = \
- QueueFileWrapper(self.q_in, self.q_out, "pipeline>")
- self._vnf_process = multiprocessing.Process(target=self._run_vpe,
- args=(queue_wrapper,
- vnf_cfg,))
- self._vnf_process.start()
- buf = []
- time.sleep(WAIT_TIME) # Give some time for config to load
- while True:
- message = ''
- while self.q_out.qsize() > 0:
- buf.append(self.q_out.get())
- message = ''.join(buf)
- if "pipeline>" in message:
- LOG.info("VPE VNF is up and running.")
- queue_wrapper.clear()
- self._resource_collect_start()
- return self._vnf_process.exitcode
- if "PANIC" in message:
- raise RuntimeError("Error starting vPE VNF.")
- LOG.info("Waiting for VNF to start.. ")
- time.sleep(3)
- if not self._vnf_process.is_alive():
- raise RuntimeError("vPE VNF process died.")
- def _get_ports_gateway(self, name):
- if 'routing_table' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['routing_table']
- for route in routing_table:
- if name == route['if']:
- return route['gateway']
- def terminate(self):
- self.execute_command("quit")
- if self._vnf_process:
- self._vnf_process.terminate()
- def _run_vpe(self, filewrapper, vnf_cfg):
- mgmt_interface = self.vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
- interfaces = self.vnfd["vdu"][0]['external-interface']
- port0_ip = ipaddress.ip_interface(six.text_type(
- "%s/%s" % (interfaces[0]["virtual-interface"]["local_ip"],
- interfaces[0]["virtual-interface"]["netmask"])))
- port1_ip = ipaddress.ip_interface(six.text_type(
- "%s/%s" % (interfaces[1]["virtual-interface"]["local_ip"],
- interfaces[1]["virtual-interface"]["netmask"])))
- dst_port0_ip = ipaddress.ip_interface(
- u"%s/%s" % (interfaces[0]["virtual-interface"]["dst_ip"],
- interfaces[0]["virtual-interface"]["netmask"]))
- dst_port1_ip = ipaddress.ip_interface(
- u"%s/%s" % (interfaces[1]["virtual-interface"]["dst_ip"],
- interfaces[1]["virtual-interface"]["netmask"]))
- vpe_vars = {"port0_local_ip": port0_ip.ip.exploded,
- "port0_dst_ip": dst_port0_ip.ip.exploded,
- "port0_local_ip_hex":
- self._ip_to_hex(port0_ip.ip.exploded),
- "port0_prefixlen": port0_ip.network.prefixlen,
- "port0_netmask": port0_ip.network.netmask.exploded,
- "port0_netmask_hex":
- self._ip_to_hex(port0_ip.network.netmask.exploded),
- "port0_local_mac":
- interfaces[0]["virtual-interface"]["local_mac"],
- "port0_dst_mac":
- interfaces[0]["virtual-interface"]["dst_mac"],
- "port0_gateway":
- self._get_ports_gateway(interfaces[0]["name"]),
- "port0_local_network":
- port0_ip.network.network_address.exploded,
- "port0_prefix": port0_ip.network.prefixlen,
- "port1_local_ip": port1_ip.ip.exploded,
- "port1_dst_ip": dst_port1_ip.ip.exploded,
- "port1_local_ip_hex":
- self._ip_to_hex(port1_ip.ip.exploded),
- "port1_prefixlen": port1_ip.network.prefixlen,
- "port1_netmask": port1_ip.network.netmask.exploded,
- "port1_netmask_hex":
- self._ip_to_hex(port1_ip.network.netmask.exploded),
- "port1_local_mac":
- interfaces[1]["virtual-interface"]["local_mac"],
- "port1_dst_mac":
- interfaces[1]["virtual-interface"]["dst_mac"],
- "port1_gateway":
- self._get_ports_gateway(interfaces[1]["name"]),
- "port1_local_network":
- port1_ip.network.network_address.exploded,
- "port1_prefix": port1_ip.network.prefixlen,
- "port0_local_ip6": self._get_port0localip6(),
- "port1_local_ip6": self._get_port1localip6(),
- "port0_prefixlen6": self._get_port0prefixlen6(),
- "port1_prefixlen6": self._get_port1prefixlen6(),
- "port0_gateway6": self._get_port0gateway6(),
- "port1_gateway6": self._get_port1gateway6(),
- "port0_dst_ip_hex6": self._get_port0localip6(),
- "port1_dst_ip_hex6": self._get_port1localip6(),
- "port0_dst_netmask_hex6": self._get_port0prefixlen6(),
- "port1_dst_netmask_hex6": self._get_port1prefixlen6(),
- "bin_path": self.bin_path,
- "socket": self.socket}
- for cfg in os.listdir(vnf_cfg):
- vpe_config = ""
- with open(os.path.join(vnf_cfg, cfg), 'r') as vpe_cfg:
- vpe_config = vpe_cfg.read()
- self._provide_config_file(cfg, vpe_config, vpe_vars)
- LOG.info("Provision and start the vPE")
- tool_path = provision_tool(self.connection,
- os.path.join(self.bin_path, "vPE_vnf"))
- cmd = VPE_PIPELINE_COMMAND.format(cfg_file="/tmp/vpe_config",
- script="/tmp/vpe_script",
- tool_path=tool_path)
- self.connection.run(cmd, stdin=filewrapper, stdout=filewrapper,
- keep_stdin_open=True, pty=True)
- def _provide_config_file(self, prefix, template, args):
- cfg, cfg_content = tempfile.mkstemp()
- cfg = os.fdopen(cfg, "w+")
- cfg.write(template.format(**args))
- cfg.close()
- cfg_file = "/tmp/%s" % prefix
- self.connection.put(cfg_content, cfg_file)
- return cfg_file
- def execute_command(self, cmd):
- ''' send cmd to vnf process '''
- LOG.info("VPE command: %s", cmd)
- output = []
- if self.q_in:
- self.q_in.put(cmd + "\r\n")
- time.sleep(3)
- while self.q_out.qsize() > 0:
- output.append(self.q_out.get())
- return "".join(output)
+ def get_stats(self, *args, **kwargs):
+ raise NotImplementedError
def collect_kpi(self):
- result = self.get_stats_vpe()
- collect_stats = self._collect_resource_kpi()
- result["collect_stats"] = collect_stats
- LOG.debug("vPE collet Kpis: %s", result)
- return result
- def get_stats_vpe(self):
- ''' get vpe statistics '''
- result = {'pkt_in_up_stream': 0, 'pkt_drop_up_stream': 0,
- 'pkt_in_down_stream': 0, 'pkt_drop_down_stream': 0}
- up_stat_commands = ['p 5 stats port in 0', 'p 5 stats port out 0',
- 'p 5 stats port out 1']
- down_stat_commands = ['p 9 stats port in 0', 'p 9 stats port out 0']
- pattern = \
- "Pkts in:\\s(\\d+)\\r\\n\\tPkts dropped by " \
- "AH:\\s(\\d+)\\r\\n\\tPkts dropped by other:\\s(\\d+)"
- for cmd in up_stat_commands:
- stats = self.execute_command(cmd)
- match = re.search(pattern, stats, re.MULTILINE)
- if match:
- result["pkt_in_up_stream"] = \
- result.get("pkt_in_up_stream", 0) + int(match.group(1))
- result["pkt_drop_up_stream"] = \
- result.get("pkt_drop_up_stream", 0) + \
- int(match.group(2)) + int(match.group(3))
- for cmd in down_stat_commands:
- stats = self.execute_command(cmd)
- match = re.search(pattern, stats, re.MULTILINE)
- if match:
- result["pkt_in_down_stream"] = \
- result.get("pkt_in_down_stream", 0) + int(match.group(1))
- result["pkt_drop_down_stream"] = \
- result.get("pkt_drop_down_stream", 0) + \
- int(match.group(2)) + int(match.group(3))
+ result = {
+ 'pkt_in_up_stream': 0,
+ 'pkt_drop_up_stream': 0,
+ 'pkt_in_down_stream': 0,
+ 'pkt_drop_down_stream': 0,
+ 'collect_stats': self.resource_helper.collect_kpi(),
+ }
+ indexes_in = [1]
+ indexes_drop = [2, 3]
+ command = 'p {0} stats port {1} 0'
+ for index, direction in ((5, 'up'), (9, 'down')):
+ key_in = "pkt_in_{0}_stream".format(direction)
+ key_drop = "pkt_drop_{0}_stream".format(direction)
+ for mode in ('in', 'out'):
+ stats = self.vnf_execute(command.format(index, mode))
+ match = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
+ if not match:
+ continue
+ result[key_in] += sum(int(match.group(x)) for x in indexes_in)
+ result[key_drop] += sum(int(match.group(x)) for x in indexes_drop)
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
return result
diff --git a/yardstick/network_services/vnf_generic/vnfdgen.py b/yardstick/network_services/vnf_generic/vnfdgen.py
index b56a91915..0120b493e 100644
--- a/yardstick/network_services/vnf_generic/vnfdgen.py
+++ b/yardstick/network_services/vnf_generic/vnfdgen.py
@@ -14,11 +14,16 @@
""" Generic file to map and build vnf discriptor """
from __future__ import absolute_import
-import collections
+from functools import reduce
import jinja2
+import logging
import yaml
+from yardstick.common.utils import try_int
+LOG = logging.getLogger(__name__)
def render(vnf_model, **kwargs):
"""Render jinja2 VNF template
@@ -40,7 +45,8 @@ def generate_vnfd(vnf_model, node):
as input for GenericVNF.__init__
# get is unused as global method inside template
- node["get"] = get
+ # node["get"] = key_flatten_get
+ node["get"] = deepgetitem
# Set Node details to default if not defined in pod file
# we CANNOT use TaskTemplate.render because it does not allow
# for missing variables, we need to allow password for key_filename
@@ -52,36 +58,34 @@ def generate_vnfd(vnf_model, node):
return filled_vnfd
-def dict_key_flatten(data):
- """ Convert nested dict structure to dotted key
- (e.g. {"a":{"b":1}} -> {"a.b":1}
- :param data: nested dictionary
- :return: flat dicrionary
- """
- next_data = {}
- # check for non-string iterables
- if not any((isinstance(v, collections.Iterable) and not isinstance(v, str))
- for v in data.values()):
- return data
+# dict_flatten was causing recursion errors with Jinja2 so we removed and replaced
+# which this function from stackoverflow that doesn't require generating entire dictionaries
+# each time we query a key
+def deepgetitem(obj, item, default=None):
+ """Steps through an item chain to get the ultimate value.
- for key, val in data.items():
- if isinstance(val, collections.Mapping):
- for n_k, n_v in val.items():
- next_data["%s.%s" % (key, n_k)] = n_v
- elif isinstance(val, collections.Iterable) and not isinstance(val,
- str):
- for index, item in enumerate(val):
- next_data["%s%d" % (key, index)] = item
- else:
- next_data[key] = val
+ If ultimate value or path to value does not exist, does not raise
+ an exception and instead returns `fallback`.
- return dict_key_flatten(next_data)
+ Based on
+ https://stackoverflow.com/a/38623359
+ https://stackoverflow.com/users/1820042/donny-winston
+ add try_int to work with sequences
-def get(obj, key, *args):
- """ Get template key from dictionary, get default value or raise an exception
+ >>> d = {'snl_final': {'about': {'_icsd': {'icsd_id': 1, 'fr': [2, 3]}}}}
+ >>> deepgetitem(d, 'snl_final.about._icsd.icsd_id')
+ 1
+ >>> deepgetitem(d, 'snl_final.about._sandbox.sbx_id')
+ >>>
+ >>> deepgetitem(d, 'snl_final.about._icsd.fr.1')
+ 3
- data = dict_key_flatten(obj)
- return data.get(key, *args)
+ def getitem(obj, name):
+ # if integer then list index
+ name = try_int(name)
+ try:
+ return obj[name]
+ except (KeyError, TypeError, IndexError):
+ return default
+ return reduce(getitem, item.split('.'), obj)
diff --git a/yardstick/network_services/yang_model.py b/yardstick/network_services/yang_model.py
new file mode 100644
index 000000000..fbf224bd8
--- /dev/null
+++ b/yardstick/network_services/yang_model.py
@@ -0,0 +1,107 @@
+# Copyright (c) 2017 Intel Corporation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import absolute_import
+from __future__ import print_function
+import logging
+import ipaddress
+import yaml
+import six
+LOG = logging.getLogger(__name__)
+class YangModel(object):
+ RULE_TEMPLATE = "p acl add 1 {0} {1} {2} {3} {4} {5} {6} {7} 0 0 {8}"
+ def __init__(self, config_file):
+ super(YangModel, self).__init__()
+ self._config_file = config_file
+ self._options = {}
+ self._rules = ''
+ @property
+ def config_file(self):
+ return self._config_file
+ @config_file.setter
+ def config_file(self, value):
+ self._config_file = value
+ self._options = {}
+ self._rules = ''
+ def _read_config(self):
+ # TODO: add some error handling in case of empty or non-existing file
+ try:
+ with open(self._config_file) as f:
+ self._options = yaml.safe_load(f)
+ except Exception as e:
+ LOG.exception("Failed to load the yaml %s", e)
+ raise
+ def _get_entries(self):
+ if not self._options:
+ return ''
+ rule_list = []
+ for ace in self._options['access-list1']['acl']['access-list-entries']:
+ # TODO: resolve ports using topology file and nodes'
+ # ids: public or private.
+ matches = ace['ace']['matches']
+ dst_ipv4_net = matches['destination-ipv4-network']
+ dst_ipv4_net_ip = ipaddress.ip_interface(six.text_type(dst_ipv4_net))
+ port0_local_network = dst_ipv4_net_ip.network.network_address.exploded
+ port0_prefix = dst_ipv4_net_ip.network.prefixlen
+ src_ipv4_net = matches['source-ipv4-network']
+ src_ipv4_net_ip = ipaddress.ip_interface(six.text_type(src_ipv4_net))
+ port1_local_network = src_ipv4_net_ip.network.network_address.exploded
+ port1_prefix = src_ipv4_net_ip.network.prefixlen
+ lower_dport = matches['destination-port-range']['lower-port']
+ upper_dport = matches['destination-port-range']['upper-port']
+ lower_sport = matches['source-port-range']['lower-port']
+ upper_sport = matches['source-port-range']['upper-port']
+ # TODO: proto should be read from file also.
+ # Now all rules in sample ACL file are TCP.
+ rule_list.append('') # get an extra new line
+ rule_list.append(self.RULE_TEMPLATE.format(port0_local_network,
+ port0_prefix,
+ port1_local_network,
+ port1_prefix,
+ lower_dport,
+ upper_dport,
+ lower_sport,
+ upper_sport,
+ 0))
+ rule_list.append(self.RULE_TEMPLATE.format(port1_local_network,
+ port1_prefix,
+ port0_local_network,
+ port0_prefix,
+ lower_sport,
+ upper_sport,
+ lower_dport,
+ upper_dport,
+ 1))
+ self._rules = '\n'.join(rule_list)
+ def get_rules(self):
+ if not self._rules:
+ self._read_config()
+ self._get_entries()
+ return self._rules