diff options
Diffstat (limited to 'yardstick/network_services')
-rw-r--r-- | yardstick/network_services/helpers/__init__.py | 0 | ||||
-rw-r--r-- | yardstick/network_services/helpers/cpu.py | 76 | ||||
-rw-r--r-- | yardstick/network_services/helpers/samplevnf_helper.py | 639 | ||||
-rw-r--r-- | yardstick/network_services/pipeline.py | 113 | ||||
-rw-r--r-- | yardstick/network_services/traffic_profile/rfc2544.py | 210 | ||||
-rw-r--r-- | yardstick/network_services/traffic_profile/traffic_profile.py | 13 | ||||
-rw-r--r-- | yardstick/network_services/utils.py | 8 | ||||
-rw-r--r-- | yardstick/network_services/vnf_generic/vnf/base.py | 234 | ||||
-rw-r--r-- | yardstick/network_services/vnf_generic/vnf/sample_vnf.py | 994 | ||||
-rw-r--r-- | yardstick/network_services/vnf_generic/vnf/tg_ping.py | 155 | ||||
-rw-r--r-- | yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py | 303 | ||||
-rw-r--r-- | yardstick/network_services/vnf_generic/vnf/tg_trex.py | 345 | ||||
-rw-r--r-- | yardstick/network_services/vnf_generic/vnf/vpe_vnf.py | 545 | ||||
-rw-r--r-- | yardstick/network_services/vnf_generic/vnfdgen.py | 64 | ||||
-rw-r--r-- | yardstick/network_services/yang_model.py | 107 |
15 files changed, 2673 insertions, 1133 deletions
diff --git a/yardstick/network_services/helpers/__init__.py b/yardstick/network_services/helpers/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/yardstick/network_services/helpers/__init__.py diff --git a/yardstick/network_services/helpers/cpu.py b/yardstick/network_services/helpers/cpu.py new file mode 100644 index 000000000..a5ba6c31e --- /dev/null +++ b/yardstick/network_services/helpers/cpu.py @@ -0,0 +1,76 @@ +# Copyright (c) 2016-2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class CpuSysCores(object): + + def __init__(self, connection=""): + self.core_map = {} + self.connection = connection + + def _open_cpuinfo(self): + lines = [] + lines = self.connection.execute("cat /proc/cpuinfo")[1].split(u'\n') + return lines + + def _get_core_details(self, lines): + core_details = [] + core_lines = {} + for line in lines: + if line.strip(): + name, value = line.split(":", 1) + core_lines[name.strip()] = value.strip() + else: + core_details.append(core_lines) + core_lines = {} + + return core_details + + def get_core_socket(self): + lines = self.connection.execute("lscpu")[1].split(u'\n') + num_cores = self._get_core_details(lines) + for num in num_cores: + self.core_map["cores_per_socket"] = num["Core(s) per socket"] + self.core_map["thread_per_core"] = num["Thread(s) per core"] + + lines = self._open_cpuinfo() + core_details = self._get_core_details(lines) + for core in core_details: + for k, v in core.items(): + if k == "physical id": + if core["physical id"] not in self.core_map: + self.core_map[core['physical id']] = [] + self.core_map[core['physical id']].append( + core["processor"]) + + return self.core_map + + def validate_cpu_cfg(self, vnf_cfg=None): + if vnf_cfg is None: + vnf_cfg = { + 'lb_config': 'SW', + 'lb_count': 1, + 'worker_config': '1C/1T', + 'worker_threads': 1 + } + if self.core_map["thread_per_core"] == 1 and \ + vnf_cfg["worker_config"] == "1C/2T": + return -1 + + if vnf_cfg['lb_config'] == 'SW': + num_cpu = int(vnf_cfg["worker_threads"]) + 5 + if int(self.core_map["cores_per_socket"]) < num_cpu: + return -1 + + return 0 diff --git a/yardstick/network_services/helpers/samplevnf_helper.py b/yardstick/network_services/helpers/samplevnf_helper.py new file mode 100644 index 000000000..1eefc5ffa --- /dev/null +++ b/yardstick/network_services/helpers/samplevnf_helper.py @@ -0,0 +1,639 @@ +# Copyright (c) 2016-2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +import ipaddress +import logging +import os +import sys +from collections import OrderedDict, defaultdict +from itertools import chain + +import six +from six.moves.configparser import ConfigParser + +from yardstick.common.utils import ip_to_hex + +LOG = logging.getLogger(__name__) + +LINK_CONFIG_TEMPLATE = """\ +link {0} down +link {0} config {1} {2} +link {0} up +""" + +ACTION_TEMPLATE = """\ +p action add {0} accept +p action add {0} fwd +p action add {0} count +""" + +FW_ACTION_TEMPLATE = """\ +p action add {0} accept +p action add {0} fwd +p action add {0} count +p action add {0} conntrack +""" + +# This sets up a basic passthrough with no rules +SCRIPT_TPL = """ +{link_config} + +{arp_config} + +{arp_config6} + +{actions} + +{rules} + +""" + + +class MultiPortConfig(object): + + HW_LB = "HW" + + @staticmethod + def float_x_plus_one_tenth_of_y(x, y): + return float(x) + float(y) / 10.0 + + @staticmethod + def make_str(base, iterator): + return ' '.join((base.format(x) for x in iterator)) + + @classmethod + def make_range_str(cls, base, start, stop=0, offset=0): + if offset and not stop: + stop = start + offset + return cls.make_str(base, range(start, stop)) + + @staticmethod + def parser_get(parser, section, key, default=None): + if parser.has_option(section, key): + return parser.get(section, key) + return default + + @staticmethod + def make_ip_addr(ip, mask_len): + try: + return ipaddress.ip_interface(six.text_type('/'.join([ip, mask_len]))) + except ValueError: + # None so we can skip later + return None + + @classmethod + def validate_ip_and_prefixlen(cls, ip_addr, prefixlen): + ip_addr = cls.make_ip_addr(ip_addr, prefixlen) + return ip_addr.ip.exploded, ip_addr.network.prefixlen + + def __init__(self, topology_file, config_tpl, tmp_file, interfaces=None, + vnf_type='CGNAT', lb_count=2, worker_threads=3, + worker_config='1C/1T', lb_config='SW', socket=0): + + super(MultiPortConfig, self).__init__() + self.topology_file = topology_file + self.worker_config = worker_config.split('/')[1].lower() + self.worker_threads = self.get_worker_threads(worker_threads) + self.vnf_type = vnf_type + self.pipe_line = 0 + self.interfaces = interfaces if interfaces else {} + self.networks = {} + self.write_parser = ConfigParser() + self.read_parser = ConfigParser() + self.read_parser.read(config_tpl) + self.master_core = self.read_parser.get("PIPELINE0", "core") + self.master_tpl = self.get_config_tpl_data('MASTER') + self.arpicmp_tpl = self.get_config_tpl_data('ARPICMP') + self.txrx_tpl = self.get_config_tpl_data('TXRX') + self.loadb_tpl = self.get_config_tpl_data('LOADB') + self.vnf_tpl = self.get_config_tpl_data(vnf_type) + self.swq = 0 + self.lb_count = int(lb_count) + self.lb_config = lb_config + self.tmp_file = os.path.join("/tmp", tmp_file) + self.pktq_out_os = [] + self.socket = socket + self.start_core = "" + self.pipeline_counter = "" + self.txrx_pipeline = "" + self.port_pair_list = [] + self.lb_to_port_pair_mapping = {} + self.init_eal() + + self.lb_index = None + self.mul = 0 + self.port_pairs = [] + self.port_pair_list = [] + self.ports_len = 0 + self.prv_que_handler = None + self.vnfd = None + self.rules = None + self.pktq_out = '' + + @staticmethod + def gen_core(core): + # return "s{}c{}".format(self.socket, core) + # don't use sockets for VNFs, because we don't want to have to + # adjust VM CPU topology. It is virtual anyway + return str(core) + + def make_port_pairs_iter(self, operand, iterable): + return (operand(x[-1], y) for y in iterable for x in chain(*self.port_pairs)) + + def make_range_port_pairs_iter(self, operand, start, end): + return self.make_port_pairs_iter(operand, range(start, end)) + + def init_eal(self): + vpci = [v['virtual-interface']["vpci"] for v in self.interfaces] + with open(self.tmp_file, 'w') as fh: + fh.write('[EAL]\n') + for item in vpci: + fh.write('w = {0}\n'.format(item)) + fh.write('\n') + + def update_timer(self): + timer_tpl = self.get_config_tpl_data('TIMER') + timer_tpl['core'] = self.gen_core(self.start_core) + self.update_write_parser(timer_tpl) + self.start_core += 1 + + def get_config_tpl_data(self, type_value): + for section in self.read_parser.sections(): + if self.read_parser.has_option(section, 'type'): + if type_value == self.read_parser.get(section, 'type'): + tpl = OrderedDict(self.read_parser.items(section)) + return tpl + + def get_txrx_tpl_data(self, value): + for section in self.read_parser.sections(): + if self.read_parser.has_option(section, 'pipeline_txrx_type'): + if value == self.read_parser.get(section, 'pipeline_txrx_type'): + tpl = OrderedDict(self.read_parser.items(section)) + return tpl + + def init_write_parser_template(self, type_value='ARPICMP'): + for section in self.read_parser.sections(): + if type_value == self.parser_get(self.read_parser, section, 'type', object()): + self.start_core = self.read_parser.getint(section, 'core') + self.pipeline_counter = self.read_parser.getint(section, 'core') + self.txrx_pipeline = self.read_parser.getint(section, 'core') + return + self.write_parser.add_section(section) + for name, value in self.read_parser.items(section): + self.write_parser.set(section, name, value) + + def update_write_parser(self, data): + section = "PIPELINE{0}".format(self.pipeline_counter) + self.write_parser.add_section(section) + for name, value in data.items(): + self.write_parser.set(section, name, value) + + def get_worker_threads(self, worker_threads): + if self.worker_config == '1t': + return worker_threads + else: + return worker_threads - worker_threads % 2 + + def generate_next_core_id(self): + if self.worker_config == '1t': + self.start_core += 1 + return + + try: + self.start_core = 'h{}'.format(int(self.start_core)) + except ValueError: + self.start_core = int(self.start_core[:-1]) + 1 + + @staticmethod + def get_port_pairs(interfaces): + port_pair_list = [] + networks = defaultdict(list) + for private_intf in interfaces: + vintf = private_intf['virtual-interface'] + networks[vintf['vld_id']].append(vintf) + + for name, net in networks.items(): + # partition returns a tuple + parts = list(name.partition('private')) + if parts[0]: + # 'private' was not in or not leftmost in the string + continue + parts[1] = 'public' + public_id = ''.join(parts) + for private_intf in net: + try: + public_peer_intfs = networks[public_id] + except KeyError: + LOG.warning("private network without peer %s, %s not found", name, public_id) + continue + + for public_intf in public_peer_intfs: + port_pair = private_intf["ifname"], public_intf["ifname"] + port_pair_list.append(port_pair) + + return port_pair_list, networks + + def get_lb_count(self): + self.lb_count = int(min(len(self.port_pair_list), self.lb_count)) + + def generate_lb_to_port_pair_mapping(self): + self.lb_to_port_pair_mapping = defaultdict(int) + port_pair_count = len(self.port_pair_list) + lb_pair_count = int(port_pair_count / self.lb_count) + for i in range(self.lb_count): + self.lb_to_port_pair_mapping[i + 1] = lb_pair_count + for i in range(port_pair_count % self.lb_count): + self.lb_to_port_pair_mapping[i + 1] += 1 + + def set_priv_to_pub_mapping(self): + return "".join(str(y) for y in [(int(x[0][-1]), int(x[1][-1])) for x in + self.port_pair_list]) + + def set_priv_que_handler(self): + # iterated twice, can't be generator + priv_to_pub_map = [(int(x[0][-1]), int(x[1][-1])) for x in self.port_pairs] + # must be list to use .index() + port_list = list(chain.from_iterable(priv_to_pub_map)) + priv_ports = (x[0] for x in priv_to_pub_map) + self.prv_que_handler = '({})'.format( + ",".join((str(port_list.index(x)) for x in priv_ports))) + + def generate_arp_route_tbl(self): + arp_config = [] + arp_route_tbl_tmpl = "({port0_dst_ip_hex},{port0_netmask_hex},{port_num}," \ + "{next_hop_ip_hex})" + for port_pair in self.port_pair_list: + for port in port_pair: + port_num = int(port[-1]) + interface = self.interfaces[port_num] + # port0_ip = ipaddress.ip_interface(six.text_type( + # "%s/%s" % (interface["virtual-interface"]["local_ip"], + # interface["virtual-interface"]["netmask"]))) + dst_port0_ip = \ + ipaddress.ip_interface(six.text_type( + "%s/%s" % (interface["virtual-interface"]["dst_ip"], + interface["virtual-interface"]["netmask"]))) + arp_vars = { + "port0_dst_ip_hex": ip_to_hex(dst_port0_ip.ip.exploded), + "port0_netmask_hex": ip_to_hex(dst_port0_ip.network.netmask.exploded), + "port_num": port_num, + # next hop is dst in this case + "next_hop_ip_hex": ip_to_hex(dst_port0_ip.ip.exploded), + } + arp_config.append(arp_route_tbl_tmpl.format(**arp_vars)) + + return ' '.join(arp_config) + + def generate_arpicmp_data(self): + swq_in_str = self.make_range_str('SWQ{}', self.swq, offset=self.lb_count) + self.swq += self.lb_count + swq_out_str = self.make_range_str('SWQ{}', self.swq, offset=self.lb_count) + self.swq += self.lb_count + mac_iter = (self.interfaces[int(x[-1])]['virtual-interface']['local_mac'] + for port_pair in self.port_pair_list for x in port_pair) + pktq_in_iter = ('RXQ{}'.format(float(x[0][-1])) for x in self.port_pair_list) + + arpicmp_data = { + 'core': self.gen_core(self.start_core), + 'pktq_in': swq_in_str, + 'pktq_out': swq_out_str, + 'ports_mac_list': ' '.join(mac_iter), + 'pktq_in_prv': ' '.join(pktq_in_iter), + 'prv_to_pub_map': self.set_priv_to_pub_mapping(), + 'arp_route_tbl': self.generate_arp_route_tbl(), + # can't use empty string, defaul to () + 'nd_route_tbl': "()", + } + self.pktq_out_os = swq_out_str.split(' ') + # why? + if self.lb_config == self.HW_LB: + arpicmp_data['pktq_in'] = swq_in_str + self.swq = 0 + return arpicmp_data + + def generate_final_txrx_data(self): + swq_start = self.swq - self.ports_len * self.worker_threads + + txq_start = 0 + txq_end = self.worker_threads + + pktq_out_iter = self.make_range_port_pairs_iter(self.float_x_plus_one_tenth_of_y, + txq_start, txq_end) + + swq_str = self.make_range_str('SWQ{}', swq_start, self.swq) + txq_str = self.make_str('TXQ{}', pktq_out_iter) + rxtx_data = { + 'pktq_in': swq_str, + 'pktq_out': txq_str, + 'pipeline_txrx_type': 'TXTX', + 'core': self.gen_core(self.start_core), + } + pktq_in = rxtx_data['pktq_in'] + pktq_in = '{0} {1}'.format(pktq_in, self.pktq_out_os[self.lb_index - 1]) + rxtx_data['pktq_in'] = pktq_in + self.pipeline_counter += 1 + return rxtx_data + + def generate_initial_txrx_data(self): + pktq_iter = self.make_range_port_pairs_iter(self.float_x_plus_one_tenth_of_y, + 0, self.worker_threads) + + rxq_str = self.make_str('RXQ{}', pktq_iter) + swq_str = self.make_range_str('SWQ{}', self.swq, offset=self.ports_len) + txrx_data = { + 'pktq_in': rxq_str, + 'pktq_out': swq_str + ' SWQ{0}'.format(self.lb_index - 1), + 'pipeline_txrx_type': 'RXRX', + 'core': self.gen_core(self.start_core), + } + self.pipeline_counter += 1 + return txrx_data + + def generate_lb_data(self): + pktq_in = self.make_range_str('SWQ{}', self.swq, offset=self.ports_len) + self.swq += self.ports_len + + offset = self.ports_len * self.worker_threads + pktq_out = self.make_range_str('SWQ{}', self.swq, offset=offset) + self.pktq_out = pktq_out.split() + + self.swq += (self.ports_len * self.worker_threads) + lb_data = { + 'prv_que_handler': self.prv_que_handler, + 'pktq_in': pktq_in, + 'pktq_out': pktq_out, + 'n_vnf_threads': str(self.worker_threads), + 'core': self.gen_core(self.start_core), + } + self.pipeline_counter += 1 + return lb_data + + def generate_vnf_data(self): + if self.lb_config == self.HW_LB: + port_iter = self.make_port_pairs_iter(self.float_x_plus_one_tenth_of_y, [self.mul]) + pktq_in = self.make_str('RXQ{}', port_iter) + + self.mul += 1 + port_iter = self.make_port_pairs_iter(self.float_x_plus_one_tenth_of_y, [self.mul]) + pktq_out = self.make_str('TXQ{}', port_iter) + + pipe_line_data = { + 'pktq_in': pktq_in, + 'pktq_out': pktq_out + ' SWQ{0}'.format(self.swq), + 'prv_que_handler': self.prv_que_handler, + 'core': self.gen_core(self.start_core), + } + self.swq += 1 + else: + pipe_line_data = { + 'pktq_in': ' '.join((self.pktq_out.pop(0) for _ in range(self.ports_len))), + 'pktq_out': self.make_range_str('SWQ{}', self.swq, offset=self.ports_len), + 'prv_que_handler': self.prv_que_handler, + 'core': self.gen_core(self.start_core), + } + self.swq += self.ports_len + + if self.vnf_type in ('ACL', 'VFW'): + pipe_line_data.pop('prv_que_handler') + + if self.vnf_tpl.get('vnf_set'): + public_ip_port_range_list = self.vnf_tpl['public_ip_port_range'].split(':') + ip_in_hex = '{:x}'.format(int(public_ip_port_range_list[0], 16) + self.lb_index - 1) + public_ip_port_range_list[0] = ip_in_hex + self.vnf_tpl['public_ip_port_range'] = ':'.join(public_ip_port_range_list) + + self.pipeline_counter += 1 + return pipe_line_data + + def generate_config_data(self): + self.init_write_parser_template() + + # use master core for master, don't use self.start_core + self.write_parser.set('PIPELINE0', 'core', self.gen_core(self.master_core)) + arpicmp_data = self.generate_arpicmp_data() + self.arpicmp_tpl.update(arpicmp_data) + self.update_write_parser(self.arpicmp_tpl) + + self.start_core += 1 + if self.vnf_type == 'CGNAPT': + self.pipeline_counter += 1 + self.update_timer() + + for lb in self.lb_to_port_pair_mapping: + self.lb_index = lb + self.mul = 0 + port_pair_count = self.lb_to_port_pair_mapping[lb] + if not self.port_pair_list: + continue + + self.port_pairs = self.port_pair_list[:port_pair_count] + self.port_pair_list = self.port_pair_list[port_pair_count:] + self.ports_len = port_pair_count * 2 + self.set_priv_que_handler() + if self.lb_config == 'SW': + txrx_data = self.generate_initial_txrx_data() + self.txrx_tpl.update(txrx_data) + self.update_write_parser(self.txrx_tpl) + self.start_core += 1 + lb_data = self.generate_lb_data() + self.loadb_tpl.update(lb_data) + self.update_write_parser(self.loadb_tpl) + self.start_core += 1 + + for i in range(self.worker_threads): + vnf_data = self.generate_vnf_data() + if not self.vnf_tpl: + self.vnf_tpl = {} + self.vnf_tpl.update(vnf_data) + self.update_write_parser(self.vnf_tpl) + try: + self.vnf_tpl.pop('vnf_set') + except KeyError: + pass + else: + self.vnf_tpl.pop('public_ip_port_range') + self.generate_next_core_id() + + if self.lb_config == 'SW': + txrx_data = self.generate_final_txrx_data() + self.txrx_tpl.update(txrx_data) + self.update_write_parser(self.txrx_tpl) + self.start_core += 1 + self.vnf_tpl = self.get_config_tpl_data(self.vnf_type) + + def generate_config(self): + self.port_pair_list, self.networks = self.get_port_pairs(self.interfaces) + self.get_lb_count() + self.generate_lb_to_port_pair_mapping() + self.generate_config_data() + self.write_parser.write(sys.stdout) + with open(self.tmp_file, 'a') as tfh: + self.write_parser.write(tfh) + + def generate_link_config(self): + + link_configs = [] + for port_pair in self.port_pair_list: + for port in port_pair: + port = port[-1] + virtual_interface = self.interfaces[int(port)]["virtual-interface"] + local_ip = virtual_interface["local_ip"] + netmask = virtual_interface["netmask"] + port_ip, prefix_len = self.validate_ip_and_prefixlen(local_ip, netmask) + link_configs.append(LINK_CONFIG_TEMPLATE.format(port, port_ip, prefix_len)) + + return ''.join(link_configs) + + def get_route_data(self, src_key, data_key, port): + route_list = self.vnfd['vdu'][0].get(src_key, []) + return next((route[data_key] for route in route_list if route['if'] == port), None) + + def get_ports_gateway(self, port): + return self.get_route_data('routing_table', 'gateway', port) + + def get_ports_gateway6(self, port): + return self.get_route_data('nd_route_tbl', 'gateway', port) + + def get_netmask_gateway(self, port): + return self.get_route_data('routing_table', 'netmask', port) + + def get_netmask_gateway6(self, port): + return self.get_route_data('nd_route_tbl', 'netmask', port) + + def generate_arp_config(self): + arp_config = [] + for port_pair in self.port_pair_list: + for port in port_pair: + gateway = self.get_ports_gateway(port) + # omit entries with no gateway + if not gateway: + continue + dst_mac = self.interfaces[int(port[-1])]["virtual-interface"]["dst_mac"] + arp_config.append((port[-1], gateway, dst_mac, self.txrx_pipeline)) + + return '\n'.join(('p {3} arpadd {0} {1} {2}'.format(*values) for values in arp_config)) + + def generate_arp_config6(self): + arp_config6 = [] + for port_pair in self.port_pair_list: + for port in port_pair: + gateway6 = self.get_ports_gateway6(port) + # omit entries with no gateway + if not gateway6: + continue + dst_mac6 = self.interfaces[int(port[-1])]["virtual-interface"]["dst_mac"] + arp_config6.append((port[-1], gateway6, dst_mac6, self.txrx_pipeline)) + + return '\n'.join(('p {3} arpadd {0} {1} {2}'.format(*values) for values in arp_config6)) + + def generate_action_config(self): + port_list = [] + for port_pair in self.port_pair_list: + for port in port_pair: + port_list.append(port[-1]) + + if self.vnf_type == "VFW": + template = FW_ACTION_TEMPLATE + else: + template = ACTION_TEMPLATE + + return ''.join((template.format(port) for port in port_list)) + + def get_ip_from_port(self, port): + return self.make_ip_addr(self.get_ports_gateway(port), self.get_netmask_gateway(port)) + + def get_ip_and_prefixlen_from_ip_of_port(self, port): + ip_addr = self.get_ip_from_port(port) + # handle cases with no gateway + if ip_addr: + return ip_addr.ip.exploded, ip_addr.network.prefixlen + else: + return None, None + + def generate_rule_config(self): + cmd = 'acl' if self.vnf_type == "ACL" else "vfw" + rules_config = self.rules if self.rules else '' + new_rules = [] + new_ipv6_rules = [] + pattern = 'p {0} add {1} {2} {3} {4} {5} 0 65535 0 65535 0 0 {6}' + for port_pair in self.port_pair_list: + src_port = int(port_pair[0][-1]) + dst_port = int(port_pair[1][-1]) + + src_ip, src_prefix_len = self.get_ip_and_prefixlen_from_ip_of_port(port_pair[0]) + dst_ip, dst_prefix_len = self.get_ip_and_prefixlen_from_ip_of_port(port_pair[1]) + # ignore entires with empty values + if all((src_ip, src_prefix_len, dst_ip, dst_prefix_len)): + new_rules.append((cmd, self.txrx_pipeline, src_ip, src_prefix_len, + dst_ip, dst_prefix_len, dst_port)) + new_rules.append((cmd, self.txrx_pipeline, dst_ip, dst_prefix_len, + src_ip, src_prefix_len, src_port)) + + src_ip = self.get_ports_gateway6(port_pair[0]) + src_prefix_len = self.get_netmask_gateway6(port_pair[0]) + dst_ip = self.get_ports_gateway6(port_pair[1]) + dst_prefix_len = self.get_netmask_gateway6(port_pair[0]) + # ignore entires with empty values + if all((src_ip, src_prefix_len, dst_ip, dst_prefix_len)): + new_ipv6_rules.append((cmd, self.txrx_pipeline, src_ip, src_prefix_len, + dst_ip, dst_prefix_len, dst_port)) + new_ipv6_rules.append((cmd, self.txrx_pipeline, dst_ip, dst_prefix_len, + src_ip, src_prefix_len, src_port)) + + acl_apply = "\np %s applyruleset" % cmd + new_rules_config = '\n'.join(pattern.format(*values) for values + in chain(new_rules, new_ipv6_rules)) + return ''.join([rules_config, new_rules_config, acl_apply]) + + def generate_script_data(self): + self.port_pair_list, self.networks = self.get_port_pairs(self.interfaces) + self.get_lb_count() + script_data = { + 'link_config': self.generate_link_config(), + 'arp_config': self.generate_arp_config(), + 'arp_config6': self.generate_arp_config6(), + 'actions': '', + 'rules': '', + } + + if self.vnf_type in ('ACL', 'VFW'): + script_data.update({ + 'actions': self.generate_action_config(), + 'rules': self.generate_rule_config(), + }) + + return script_data + + def generate_script(self, vnfd, rules=None): + self.vnfd = vnfd + self.rules = rules + script_data = self.generate_script_data() + script = SCRIPT_TPL.format(**script_data) + if self.lb_config == self.HW_LB: + script += 'set fwd rxonly' + hwlb_tpl = """ +set_sym_hash_ena_per_port {0} enable +set_hash_global_config {0} simple_xor ipv4-udp enable +set_sym_hash_ena_per_port {1} enable +set_hash_global_config {1} simple_xor ipv4-udp enable +set_hash_input_set {0} ipv4-udp src-ipv4 udp-src-port add +set_hash_input_set {1} ipv4-udp dst-ipv4 udp-dst-port add +set_hash_input_set {0} ipv6-udp src-ipv6 udp-src-port add +set_hash_input_set {1} ipv6-udp dst-ipv6 udp-dst-port add +""" + for port_pair in self.port_pair_list: + script += hwlb_tpl.format(port_pair[0][-1], port_pair[1][-1]) + return script diff --git a/yardstick/network_services/pipeline.py b/yardstick/network_services/pipeline.py new file mode 100644 index 000000000..d781ba0cd --- /dev/null +++ b/yardstick/network_services/pipeline.py @@ -0,0 +1,113 @@ +# Copyright (c) 2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+from __future__ import print_function
+import itertools
+
+from six.moves import zip
+
+FIREWALL_ADD_DEFAULT = "p {0} firewall add default 1"
+FIREWALL_ADD_PRIO = """\
+p {0} firewall add priority 1 ipv4 {1} 24 0.0.0.0 0 0 65535 0 65535 6 0xFF port 0"""
+
+FLOW_ADD_QINQ_RULES = """\
+p {0} flow add qinq 128 512 port 0 id 1
+p {0} flow add default 1"""
+
+ACTION_FLOW_BULK = "p {0} action flow bulk /tmp/action_bulk_512.txt"
+ACTION_DSCP_CLASS_COLOR = "p {0} action dscp {1} class {2} color {3}"
+ROUTE_ADD_DEFAULT = "p {0} route add default 1"
+ROUTE_ADD_ETHER_QINQ = 'p {0} route add {1} {2} port 0 ether {3} qinq 0 {4}'
+ROUTE_ADD_ETHER_MPLS = "p {0} route add {1} 21 port 0 ether {2} mpls 0:{3}"
+
+
+class PipelineRules(object):
+
+ def __init__(self, pipeline_id=0):
+ super(PipelineRules, self).__init__()
+ self.rule_list = []
+ self.pipeline_id = pipeline_id
+
+ def __str__(self):
+ return '\n'.join(self.rule_list)
+
+ def get_string(self):
+ return str(self)
+
+ def next_pipeline(self, num=1):
+ self.pipeline_id += num
+
+ def add_newline(self):
+ self.rule_list.append('')
+
+ def add_rule(self, base, *args):
+ self.rule_list.append(base.format(self.pipeline_id, *args))
+
+ def add_firewall_prio(self, ip):
+ self.add_rule(FIREWALL_ADD_PRIO, ip)
+
+ def add_firewall_script(self, ip):
+ ip_addr = ip.split('.')
+ assert len(ip_addr) == 4
+ ip_addr[-1] = '0'
+ for i in range(256):
+ ip_addr[-2] = str(i)
+ ip = '.'.join(ip_addr)
+ self.add_firewall_prio(ip)
+ self.add_rule(FIREWALL_ADD_DEFAULT)
+ self.add_newline()
+
+ def add_flow_classification_script(self):
+ self.add_rule(FLOW_ADD_QINQ_RULES)
+
+ def add_flow_action(self):
+ self.add_rule(ACTION_FLOW_BULK)
+
+ def add_dscp_class_color(self, dscp, color):
+ self.add_rule(ACTION_DSCP_CLASS_COLOR, dscp, dscp % 4, color)
+
+ def add_flow_action2(self):
+ self.add_rule(ACTION_FLOW_BULK)
+ for dscp, color in zip(range(64), itertools.cycle('GYR')):
+ self.add_dscp_class_color(dscp, color)
+
+ def add_route_ether_mpls(self, ip, mac_addr, index):
+ self.add_rule(ROUTE_ADD_ETHER_MPLS, ip, mac_addr, index)
+
+ def add_route_script(self, ip, mac_addr):
+ ip_addr = ip.split('.')
+ assert len(ip_addr) == 4
+ ip_addr[-1] = '0'
+ for index in range(0, 256, 8):
+ ip_addr[-2] = str(index)
+ ip = '.'.join(ip_addr)
+ self.add_route_ether_mpls(ip, mac_addr, index)
+ self.add_rule(ROUTE_ADD_DEFAULT)
+ self.add_newline()
+
+ def add_ether_qinq(self, ip, mask, mac_addr, index):
+ self.add_rule(ROUTE_ADD_ETHER_QINQ, ip, mask, mac_addr, index)
+
+ def add_route_script2(self, ip, mac_addr):
+ ip_addr = ip.split('.')
+ assert len(ip_addr) == 4
+ ip_addr[-1] = '0'
+ mask = 24
+ for i in range(0, 256):
+ ip_addr[-2] = str(i)
+ ip = '.'.join(ip_addr)
+ self.add_ether_qinq(ip, mask, mac_addr, i)
+ self.add_rule(ROUTE_ADD_DEFAULT)
+ self.add_newline()
diff --git a/yardstick/network_services/traffic_profile/rfc2544.py b/yardstick/network_services/traffic_profile/rfc2544.py index 99964d329..b07bc9d5a 100644 --- a/yardstick/network_services/traffic_profile/rfc2544.py +++ b/yardstick/network_services/traffic_profile/rfc2544.py @@ -17,6 +17,10 @@ from __future__ import absolute_import from __future__ import division import logging +from stl.trex_stl_lib.trex_stl_client import STLStream +from stl.trex_stl_lib.trex_stl_streams import STLFlowLatencyStats +from stl.trex_stl_lib.trex_stl_streams import STLTXCont + from yardstick.network_services.traffic_profile.traffic_profile \ import TrexProfile @@ -28,79 +32,175 @@ class RFC2544Profile(TrexProfile): def __init__(self, traffic_generator): super(RFC2544Profile, self).__init__(traffic_generator) + self.generator = None self.max_rate = None self.min_rate = None + self.ports = None self.rate = 100 - self.tmp_drop = None - self.tmp_throughput = None - self.profile_data = None - - def execute(self, traffic_generator): - ''' Generate the stream and run traffic on the given ports ''' - if self.first_run: - self.profile_data = self.params.get('private', '') - ports = [traffic_generator.my_ports[0]] - traffic_generator.client.add_streams(self.get_streams(), - ports=ports[0]) - profile_data = self.params.get('public', '') - if profile_data: - self.profile_data = profile_data - ports.append(traffic_generator.my_ports[1]) - traffic_generator.client.add_streams(self.get_streams(), - ports=ports[1]) + self.drop_percent_at_max_tx = None + self.throughput_max = None - self.max_rate = self.rate - self.min_rate = 0 - traffic_generator.client.start(ports=ports, - mult=self.get_multiplier(), - duration=30, force=True) - self.tmp_drop = 0 - self.tmp_throughput = 0 + def register_generator(self, generator): + self.generator = generator + + def execute(self, traffic_generator=None): + """ Generate the stream and run traffic on the given ports """ + if traffic_generator is not None and self.generator is None: + self.generator = traffic_generator + + if self.ports is not None: + return + + self.ports = [] + priv_ports = self.generator.priv_ports + pub_ports = self.generator.pub_ports + # start from 1 for private_1, public_1, etc. + for index, (priv_port, pub_port) in enumerate(zip(priv_ports, pub_ports), 1): + profile_data = self.params.get('private_{}'.format(index), '') + self.ports.append(priv_port) + # pass profile_data directly, don't use self.profile_data + self.generator.client.add_streams(self.get_streams(profile_data), ports=priv_port) + profile_data = self.params.get('public_{}'.format(index), '') + # correlated traffic doesn't use public traffic? + if not profile_data or self.generator.rfc2544_helper.correlated_traffic: + continue + # just get the pub_port + self.ports.append(pub_port) + self.generator.client.add_streams(self.get_streams(profile_data), ports=pub_port) + + self.max_rate = self.rate + self.min_rate = 0 + self.generator.client.start(ports=self.ports, mult=self.get_multiplier(), + duration=30, force=True) + self.drop_percent_at_max_tx = 0 + self.throughput_max = 0 def get_multiplier(self): - ''' Get the rate at which next iternation to run ''' + """ Get the rate at which next iteration to run """ self.rate = round((self.max_rate + self.min_rate) / 2.0, 2) multiplier = round(self.rate / self.pps, 2) return str(multiplier) - def get_drop_percentage(self, traffic_generator, - samples, tol_min, tolerance): - ''' Calculate the drop percentage and run the traffic ''' - in_packets = sum([samples[iface]['in_packets'] for iface in samples]) - out_packets = sum([samples[iface]['out_packets'] for iface in samples]) + def get_drop_percentage(self, generator=None): + """ Calculate the drop percentage and run the traffic """ + if generator is None: + generator = self.generator + run_duration = self.generator.RUN_DURATION + samples = self.generator.generate_samples() + + in_packets = sum([value['in_packets'] for value in samples.values()]) + out_packets = sum([value['out_packets'] for value in samples.values()]) + packet_drop = abs(out_packets - in_packets) drop_percent = 100.0 try: - drop_percent = round((packet_drop / float(out_packets)) * 100, 2) + drop_percent = round((packet_drop / float(out_packets)) * 100, 5) except ZeroDivisionError: LOGGING.info('No traffic is flowing') - samples['TxThroughput'] = out_packets / 30 - samples['RxThroughput'] = in_packets / 30 - samples['CurrentDropPercentage'] = drop_percent - samples['Throughput'] = self.tmp_throughput - samples['DropPercentage'] = self.tmp_drop - if drop_percent > tolerance and self.tmp_throughput == 0: - samples['Throughput'] = (in_packets / 30) - samples['DropPercentage'] = drop_percent - if self.first_run: - max_supported_rate = out_packets / 30 - self.rate = max_supported_rate + + # TODO(esm): RFC2544 doesn't tolerate packet loss, why do we? + tolerance_low = generator.rfc2544_helper.tolerance_low + tolerance_high = generator.rfc2544_helper.tolerance_high + + tx_rate = out_packets / run_duration + rx_rate = in_packets / run_duration + + throughput_max = self.throughput_max + drop_percent_at_max_tx = self.drop_percent_at_max_tx + + if self.drop_percent_at_max_tx is None: + self.rate = tx_rate self.first_run = False - if drop_percent > tolerance: + + if drop_percent > tolerance_high: + # TODO(esm): why don't we discard results that are out of tolerance? self.max_rate = self.rate - elif drop_percent < tol_min: + if throughput_max == 0: + throughput_max = rx_rate + drop_percent_at_max_tx = drop_percent + + elif drop_percent >= tolerance_low: + # TODO(esm): why do we update the samples dict in this case + # and not update our tracking values? + throughput_max = rx_rate + drop_percent_at_max_tx = drop_percent + + elif drop_percent >= self.drop_percent_at_max_tx: + # TODO(esm): why don't we discard results that are out of tolerance? self.min_rate = self.rate - if drop_percent >= self.tmp_drop: - self.tmp_drop = drop_percent - self.tmp_throughput = (in_packets / 30) - samples['Throughput'] = (in_packets / 30) - samples['DropPercentage'] = drop_percent + self.drop_percent_at_max_tx = drop_percent_at_max_tx = drop_percent + self.throughput_max = throughput_max = rx_rate + else: - samples['Throughput'] = (in_packets / 30) - samples['DropPercentage'] = drop_percent + # TODO(esm): why don't we discard results that are out of tolerance? + self.min_rate = self.rate + + generator.clear_client_stats() + generator.start_client(mult=self.get_multiplier(), + duration=run_duration, force=True) + + # if correlated traffic update the Throughput + if generator.rfc2544_helper.correlated_traffic: + throughput_max *= 2 + + samples.update({ + 'TxThroughput': tx_rate, + 'RxThroughput': rx_rate, + 'CurrentDropPercentage': drop_percent, + 'Throughput': throughput_max, + 'DropPercentage': drop_percent_at_max_tx, + }) - traffic_generator.client.clear_stats(ports=traffic_generator.my_ports) - traffic_generator.client.start(ports=traffic_generator.my_ports, - mult=self.get_multiplier(), - duration=30, force=True) return samples + + def execute_latency(self, generator=None, samples=None): + if generator is None: + generator = self.generator + + if samples is None: + samples = generator.generate_samples() + + self.pps, multiplier = self.calculate_pps(samples) + self.ports = [] + self.pg_id = self.params['traffic_profile'].get('pg_id', 1) + priv_ports = generator.priv_ports + pub_ports = generator.pub_ports + for index, (priv_port, pub_port) in enumerate(zip(priv_ports, pub_ports), 1): + profile_data = self.params.get('private_{}'.format(index), '') + self.ports.append(priv_port) + generator.client.add_streams(self.get_streams(profile_data), + ports=priv_port) + + profile_data = self.params.get('public_{}'.format(index), '') + if not profile_data or generator.correlated_traffic: + continue + + pub_port = generator.pub_ports[index] + self.ports.append(pub_port) + generator.client.add_streams(self.get_streams(profile_data), + ports=pub_port) + + generator.start_client(ports=self.ports, mult=str(multiplier), + duration=120, force=True) + self.first_run = False + + def calculate_pps(self, samples): + pps = round(samples['Throughput'] / 2, 2) + multiplier = round(self.rate / self.pps, 2) + return pps, multiplier + + def create_single_stream(self, packet_size, pps, isg=0): + packet = self._create_single_packet(packet_size) + if pps: + stl_mode = STLTXCont(pps=pps) + else: + stl_mode = STLTXCont(pps=self.pps) + if self.pg_id: + LOGGING.debug("pg_id: %s", self.pg_id) + stl_flow_stats = STLFlowLatencyStats(pg_id=self.pg_id) + stream = STLStream(isg=isg, packet=packet, mode=stl_mode, + flow_stats=stl_flow_stats) + self.pg_id += 1 + else: + stream = STLStream(isg=isg, packet=packet, mode=stl_mode) + return stream diff --git a/yardstick/network_services/traffic_profile/traffic_profile.py b/yardstick/network_services/traffic_profile/traffic_profile.py index 156cc6644..3e1f8d89f 100644 --- a/yardstick/network_services/traffic_profile/traffic_profile.py +++ b/yardstick/network_services/traffic_profile/traffic_profile.py @@ -399,16 +399,19 @@ class TrexProfile(TrafficProfile): logging.debug("Imax: %s rate: %s", imix_count, self.rate) return imix_count - def get_streams(self): - """ generate trex stream """ + def get_streams(self, profile_data): + """ generate trex stream + :param profile_data: + :type profile_data: + """ self.streams = [] self.pps = self.params['traffic_profile'].get('frame_rate', 100) - for packet_name in self.profile_data: - outer_l2 = self.profile_data[packet_name].get('outer_l2') + for packet_name in profile_data: + outer_l2 = profile_data[packet_name].get('outer_l2') imix_data = self.generate_imix_data(outer_l2) if not imix_data: imix_data = {64: self.pps} - self.generate_vm(self.profile_data[packet_name]) + self.generate_vm(profile_data[packet_name]) for size in imix_data: self._generate_streams(size, imix_data[size]) self._generate_profile() diff --git a/yardstick/network_services/utils.py b/yardstick/network_services/utils.py index cb71a6029..38fbda47f 100644 --- a/yardstick/network_services/utils.py +++ b/yardstick/network_services/utils.py @@ -45,17 +45,19 @@ def get_nsb_option(option, default=None): return default -def provision_tool(connection, tool_path): +def provision_tool(connection, tool_path, tool_file=None): """ verify if the tool path exits on the node, if not push the local binary to remote node :return - Tool path """ + if tool_file: + tool_path = os.path.join(tool_path, tool_file) bin_path = get_nsb_option("bin_path") - exit_status, stdout = connection.execute("which %s" % tool_path)[:2] + exit_status, stdout = connection.execute("which %s > /dev/null 2>&1" % tool_path)[:2] if exit_status == 0: - return encodeutils.safe_decode(stdout, incoming='utf-8').rstrip() + return encodeutils.safe_decode(tool_path, incoming='utf-8').rstrip() logging.warning("%s not found on %s, will try to copy from localhost", tool_path, connection.host) diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py index 2df6037f3..955f9f03d 100644 --- a/yardstick/network_services/vnf_generic/vnf/base.py +++ b/yardstick/network_services/vnf_generic/vnf/base.py @@ -15,10 +15,6 @@ from __future__ import absolute_import import logging -import ipaddress -import six - -from yardstick.network_services.utils import get_nsb_option LOG = logging.getLogger(__name__) @@ -61,192 +57,69 @@ class QueueFileWrapper(object): self.q_out.get() -class GenericVNF(object): +class VnfdHelper(dict): + + @property + def mgmt_interface(self): + return self["mgmt-interface"] + + @property + def vdu(self): + return self['vdu'] + + @property + def vdu0(self): + return self.vdu[0] + + @property + def interfaces(self): + return self.vdu0['external-interface'] + + @property + def kpi(self): + return self['benchmark']['kpi'] + + def find_virtual_interface(self, **kwargs): + key, value = next(iter(kwargs.items())) + for interface in self.interfaces: + virtual_intf = interface["virtual-interface"] + if virtual_intf[key] == value: + return interface + + def find_interface(self, **kwargs): + key, value = next(iter(kwargs.items())) + for interface in self.interfaces: + if interface[key] == value: + return interface + + +class VNFObject(object): + + def __init__(self, name, vnfd): + super(VNFObject, self).__init__() + self.name = name + self.vnfd_helper = VnfdHelper(vnfd) # fixme: parse this into a structure + + +class GenericVNF(VNFObject): + """ Class providing file-like API for generic VNF implementation """ - def __init__(self, vnfd): - super(GenericVNF, self).__init__() - self.vnfd = vnfd # fixme: parse this into a structure + def __init__(self, name, vnfd): + super(GenericVNF, self).__init__(name, vnfd) # List of statistics we can obtain from this VNF # - ETSI MANO 6.3.1.1 monitoring_parameter - self.kpi = self._get_kpi_definition(vnfd) + self.kpi = self._get_kpi_definition() # Standard dictionary containing params like thread no, buffer size etc self.config = {} self.runs_traffic = False - self.name = "vnf__1" # name in topology file - self.bin_path = get_nsb_option("bin_path", "") - @classmethod - def _get_kpi_definition(cls, vnfd): + def _get_kpi_definition(self): """ Get list of KPIs defined in VNFD :param vnfd: :return: list of KPIs, e.g. ['throughput', 'latency'] """ - return vnfd['benchmark']['kpi'] - - @classmethod - def get_ip_version(cls, ip_addr): - """ get ip address version v6 or v4 """ - try: - address = ipaddress.ip_address(six.text_type(ip_addr)) - except ValueError: - LOG.error(ip_addr, " is not valid") - return - else: - return address.version - - def _ip_to_hex(self, ip_addr): - ip_x = ip_addr - if self.get_ip_version(ip_addr) == 4: - ip_to_convert = ip_addr.split(".") - ip_octect = [int(octect) for octect in ip_to_convert] - ip_x = "{0[0]:02X}{0[1]:02X}{0[2]:02X}{0[3]:02X}".format(ip_octect) - return ip_x - - def _get_dpdk_port_num(self, name): - for intf in self.vnfd['vdu'][0]['external-interface']: - if name == intf['name']: - return intf['virtual-interface']['dpdk_port_num'] - - def _append_routes(self, ip_pipeline_cfg): - if 'routing_table' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['routing_table'] - - where = ip_pipeline_cfg.find("arp_route_tbl") - link = ip_pipeline_cfg[:where] - route_add = ip_pipeline_cfg[where:] - - tmp = route_add.find('\n') - route_add = route_add[tmp:] - - cmds = "arp_route_tbl =" - - for route in routing_table: - net = self._ip_to_hex(route['network']) - net_nm = self._ip_to_hex(route['netmask']) - net_gw = self._ip_to_hex(route['gateway']) - port = self._get_dpdk_port_num(route['if']) - cmd = \ - " ({port0_local_ip_hex},{port0_netmask_hex},{dpdk_port},"\ - "{port1_local_ip_hex})".format(port0_local_ip_hex=net, - port0_netmask_hex=net_nm, - dpdk_port=port, - port1_local_ip_hex=net_gw) - cmds += cmd - - cmds += '\n' - ip_pipeline_cfg = link + cmds + route_add - - return ip_pipeline_cfg - - def _append_nd_routes(self, ip_pipeline_cfg): - if 'nd_route_tbl' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['nd_route_tbl'] - - where = ip_pipeline_cfg.find("nd_route_tbl") - link = ip_pipeline_cfg[:where] - route_nd = ip_pipeline_cfg[where:] - - tmp = route_nd.find('\n') - route_nd = route_nd[tmp:] - - cmds = "nd_route_tbl =" - - for route in routing_table: - net = route['network'] - net_nm = route['netmask'] - net_gw = route['gateway'] - port = self._get_dpdk_port_num(route['if']) - cmd = \ - " ({port0_local_ip_hex},{port0_netmask_hex},{dpdk_port},"\ - "{port1_local_ip_hex})".format(port0_local_ip_hex=net, - port0_netmask_hex=net_nm, - dpdk_port=port, - port1_local_ip_hex=net_gw) - cmds += cmd - - cmds += '\n' - ip_pipeline_cfg = link + cmds + route_nd - - return ip_pipeline_cfg - - def _get_port0localip6(self): - return_value = "" - if 'nd_route_tbl' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['nd_route_tbl'] - - inc = 0 - for route in routing_table: - inc += 1 - if inc == 1: - return_value = route['network'] - LOG.info("_get_port0localip6 : %s", return_value) - return return_value - - def _get_port1localip6(self): - return_value = "" - if 'nd_route_tbl' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['nd_route_tbl'] - - inc = 0 - for route in routing_table: - inc += 1 - if inc == 2: - return_value = route['network'] - LOG.info("_get_port1localip6 : %s", return_value) - return return_value - - def _get_port0prefixlen6(self): - return_value = "" - if 'nd_route_tbl' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['nd_route_tbl'] - - inc = 0 - for route in routing_table: - inc += 1 - if inc == 1: - return_value = route['netmask'] - LOG.info("_get_port0prefixlen6 : %s", return_value) - return return_value - - def _get_port1prefixlen6(self): - return_value = "" - if 'nd_route_tbl' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['nd_route_tbl'] - - inc = 0 - for route in routing_table: - inc += 1 - if inc == 2: - return_value = route['netmask'] - LOG.info("_get_port1prefixlen6 : %s", return_value) - return return_value - - def _get_port0gateway6(self): - return_value = "" - if 'nd_route_tbl' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['nd_route_tbl'] - - inc = 0 - for route in routing_table: - inc += 1 - if inc == 1: - return_value = route['network'] - LOG.info("_get_port0gateway6 : %s", return_value) - return return_value - - def _get_port1gateway6(self): - return_value = "" - if 'nd_route_tbl' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['nd_route_tbl'] - - inc = 0 - for route in routing_table: - inc += 1 - if inc == 2: - return_value = route['network'] - LOG.info("_get_port1gateway6 : %s", return_value) - return return_value + return self.vnfd_helper.kpi def instantiate(self, scenario_cfg, context_cfg): """ Prepare VNF for operation and start the VNF process/VM @@ -284,11 +157,10 @@ class GenericVNF(object): class GenericTrafficGen(GenericVNF): """ Class providing file-like API for generic traffic generator """ - def __init__(self, vnfd): - super(GenericTrafficGen, self).__init__(vnfd) + def __init__(self, name, vnfd): + super(GenericTrafficGen, self).__init__(name, vnfd) self.runs_traffic = True self.traffic_finished = False - self.name = "tgen__1" # name in topology file def run_traffic(self, traffic_profile): """ Generate traffic on the wire according to the given params. diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py new file mode 100644 index 000000000..89c086d97 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -0,0 +1,994 @@ +# Copyright (c) 2016-2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" Base class implementation for generic vnf implementation """ + +from __future__ import absolute_import + +import posixpath +import time +import logging +import os +import re +import subprocess +from collections import Mapping + +from multiprocessing import Queue, Value, Process + +from six.moves import cStringIO + +from yardstick.benchmark.contexts.base import Context +from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file +from yardstick.network_services.helpers.cpu import CpuSysCores +from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig +from yardstick.network_services.nfvi.resource import ResourceProfile +from yardstick.network_services.vnf_generic.vnf.base import GenericVNF +from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper +from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen +from yardstick.network_services.utils import get_nsb_option + +from stl.trex_stl_lib.trex_stl_client import STLClient +from stl.trex_stl_lib.trex_stl_client import LoggerApi +from stl.trex_stl_lib.trex_stl_exceptions import STLError, STLStateError + +from yardstick.ssh import AutoConnectSSH + +DPDK_VERSION = "dpdk-16.07" + +LOG = logging.getLogger(__name__) + + +REMOTE_TMP = "/tmp" + + +class VnfSshHelper(AutoConnectSSH): + + def __init__(self, node, bin_path, wait=None): + self.node = node + kwargs = self.args_from_node(self.node) + if wait: + kwargs.setdefault('wait', wait) + + super(VnfSshHelper, self).__init__(**kwargs) + self.bin_path = bin_path + + @staticmethod + def get_class(): + # must return static class name, anything else refers to the calling class + # i.e. the subclass, not the superclass + return VnfSshHelper + + def copy(self): + # this copy constructor is different from SSH classes, since it uses node + return self.get_class()(self.node, self.bin_path) + + def upload_config_file(self, prefix, content): + cfg_file = os.path.join(REMOTE_TMP, prefix) + LOG.debug(content) + file_obj = cStringIO(content) + self.put_file_obj(file_obj, cfg_file) + return cfg_file + + def join_bin_path(self, *args): + return os.path.join(self.bin_path, *args) + + def provision_tool(self, tool_path=None, tool_file=None): + if tool_path is None: + tool_path = self.bin_path + return super(VnfSshHelper, self).provision_tool(tool_path, tool_file) + + +class SetupEnvHelper(object): + + CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config") + CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script") + CORES = [] + DEFAULT_CONFIG_TPL_CFG = "sample.cfg" + PIPELINE_COMMAND = '' + VNF_TYPE = "SAMPLE" + + def __init__(self, vnfd_helper, ssh_helper, scenario_helper): + super(SetupEnvHelper, self).__init__() + self.vnfd_helper = vnfd_helper + self.ssh_helper = ssh_helper + self.scenario_helper = scenario_helper + + def _get_ports_gateway(self, name): + routing_table = self.vnfd_helper.vdu0.get('routing_table', []) + for route in routing_table: + if name == route['if']: + return route['gateway'] + return None + + def build_config(self): + raise NotImplementedError + + def setup_vnf_environment(self): + pass + # raise NotImplementedError + + def tear_down(self): + raise NotImplementedError + + +class DpdkVnfSetupEnvHelper(SetupEnvHelper): + + APP_NAME = 'DpdkVnf' + DPDK_BIND_CMD = "sudo {dpdk_nic_bind} {force} -b {driver} {vpci}" + DPDK_UNBIND_CMD = "sudo {dpdk_nic_bind} --force -b {driver} {vpci}" + FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'" + + HW_DEFAULT_CORE = 3 + SW_DEFAULT_CORE = 2 + + DPDK_STATUS_DRIVER_RE = re.compile(r"(\d{2}:\d{2}\.\d).*drv=([-\w]+)") + + @staticmethod + def _update_packet_type(ip_pipeline_cfg, traffic_options): + match_str = 'pkt_type = ipv4' + replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type']) + pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str) + return pipeline_config_str + + @classmethod + def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options): + traffic_type = traffic_options['traffic_type'] + + if traffic_options['vnf_type'] is not cls.APP_NAME: + match_str = 'traffic_type = 4' + replace_str = 'traffic_type = {0}'.format(traffic_type) + + elif traffic_type == 4: + match_str = 'pkt_type = ipv4' + replace_str = 'pkt_type = ipv4' + + else: + match_str = 'pkt_type = ipv4' + replace_str = 'pkt_type = ipv6' + + pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str) + return pipeline_config_str + + def __init__(self, vnfd_helper, ssh_helper, scenario_helper): + super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper) + self.all_ports = None + self.bound_pci = None + self._dpdk_nic_bind = None + self.socket = None + + @property + def dpdk_nic_bind(self): + if self._dpdk_nic_bind is None: + self._dpdk_nic_bind = self.ssh_helper.provision_tool(tool_file="dpdk-devbind.py") + return self._dpdk_nic_bind + + def _setup_hugepages(self): + cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo" + hugepages = self.ssh_helper.execute(cmd)[1].rstrip() + + memory_path = \ + '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages + self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path) + + if hugepages == "2048kB": + pages = 16384 + else: + pages = 16 + + self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path)) + + def _get_dpdk_port_num(self, name): + interface = self.vnfd_helper.find_interface(name=name) + return interface['virtual-interface']['dpdk_port_num'] + + def build_config(self): + vnf_cfg = self.scenario_helper.vnf_cfg + task_path = self.scenario_helper.task_path + + lb_count = vnf_cfg.get('lb_count', 3) + lb_config = vnf_cfg.get('lb_config', 'SW') + worker_config = vnf_cfg.get('worker_config', '1C/1T') + worker_threads = vnf_cfg.get('worker_threads', 3) + + traffic_type = self.scenario_helper.all_options.get('traffic_type', 4) + traffic_options = { + 'traffic_type': traffic_type, + 'pkt_type': 'ipv%s' % traffic_type, + 'vnf_type': self.VNF_TYPE, + } + + config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path) + config_basename = posixpath.basename(self.CFG_CONFIG) + script_basename = posixpath.basename(self.CFG_SCRIPT) + multiport = MultiPortConfig(self.scenario_helper.topology, + config_tpl_cfg, + config_basename, + self.vnfd_helper.interfaces, + self.VNF_TYPE, + lb_count, + worker_threads, + worker_config, + lb_config, + self.socket) + + multiport.generate_config() + with open(self.CFG_CONFIG) as handle: + new_config = handle.read() + + new_config = self._update_traffic_type(new_config, traffic_options) + new_config = self._update_packet_type(new_config, traffic_options) + + self.ssh_helper.upload_config_file(config_basename, new_config) + self.ssh_helper.upload_config_file(script_basename, + multiport.generate_script(self.vnfd_helper)) + self.all_ports = multiport.port_pair_list + + LOG.info("Provision and start the %s", self.APP_NAME) + self._build_pipeline_kwargs() + return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs) + + def _build_pipeline_kwargs(self): + tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME) + ports_len_hex = hex(2 ** (len(self.all_ports) + 1) - 1) + self.pipeline_kwargs = { + 'cfg_file': self.CFG_CONFIG, + 'script': self.CFG_SCRIPT, + 'ports_len_hex': ports_len_hex, + 'tool_path': tool_path, + } + + def _get_app_cpu(self): + if self.CORES: + return self.CORES + + vnf_cfg = self.scenario_helper.vnf_cfg + sys_obj = CpuSysCores(self.ssh_helper) + self.sys_cpu = sys_obj.get_core_socket() + num_core = int(vnf_cfg["worker_threads"]) + if vnf_cfg.get("lb_config", "SW") == 'HW': + num_core += self.HW_DEFAULT_CORE + else: + num_core += self.SW_DEFAULT_CORE + app_cpu = self.sys_cpu[str(self.socket)][:num_core] + return app_cpu + + def _get_cpu_sibling_list(self, cores=None): + if cores is None: + cores = self._get_app_cpu() + sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list" + awk_template = "awk -F: '{ print $1 }' < %s" + sys_path = "/sys/devices/system/cpu/" + cpu_topology = [] + try: + for core in cores: + sys_cmd = sys_cmd_template % (sys_path, core) + cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1] + cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(',')) + + return cpu_topology + except Exception: + return [] + + def _validate_cpu_cfg(self): + return self._get_cpu_sibling_list() + + def _find_used_drivers(self): + cmd = "{0} -s".format(self.dpdk_nic_bind) + rc, dpdk_status, _ = self.ssh_helper.execute(cmd) + + self.used_drivers = { + vpci: (index, driver) + for index, (vpci, driver) + in enumerate(self.DPDK_STATUS_DRIVER_RE.findall(dpdk_status)) + if any(b.endswith(vpci) for b in self.bound_pci) + } + + def setup_vnf_environment(self): + self._setup_dpdk() + resource = self._setup_resources() + self._kill_vnf() + self._detect_drivers() + return resource + + def _kill_vnf(self): + self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME) + + def _setup_dpdk(self): + """ setup dpdk environment needed for vnf to run """ + + self._setup_hugepages() + self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio") + + exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0] + if exit_status == 0: + return + + dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION) + dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh") + exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0] + if exit_status != 0: + self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup) + + def _setup_resources(self): + interfaces = self.vnfd_helper.interfaces + self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces] + + # what is this magic? how do we know which socket is for which port? + # what about quad-socket? + if any(v[5] == "0" for v in self.bound_pci): + self.socket = 0 + else: + self.socket = 1 + + cores = self._validate_cpu_cfg() + return ResourceProfile(self.vnfd_helper, cores) + + def _detect_drivers(self): + interfaces = self.vnfd_helper.interfaces + + self._find_used_drivers() + for vpci, (index, _) in self.used_drivers.items(): + try: + intf1 = next(v for v in interfaces if vpci == v['virtual-interface']['vpci']) + except StopIteration: + pass + else: + intf1['dpdk_port_num'] = index + + for vpci in self.bound_pci: + self._bind_dpdk('igb_uio', vpci) + time.sleep(2) + + def _bind_dpdk(self, driver, vpci, force=True): + if force: + force = '--force ' + else: + force = '' + cmd = self.DPDK_BIND_CMD.format(force=force, + dpdk_nic_bind=self.dpdk_nic_bind, + driver=driver, + vpci=vpci) + self.ssh_helper.execute(cmd) + + def _detect_and_bind_dpdk(self, vpci, driver): + find_net_cmd = self.FIND_NET_CMD.format(vpci) + exit_status, _, _ = self.ssh_helper.execute(find_net_cmd) + if exit_status == 0: + # already bound + return None + self._bind_dpdk(driver, vpci) + exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd) + if exit_status != 0: + # failed to bind + return None + return stdout + + def _bind_kernel_devices(self): + for intf in self.vnfd_helper.interfaces: + vi = intf["virtual-interface"] + stdout = self._detect_and_bind_dpdk(vi["vpci"], vi["driver"]) + if stdout is not None: + vi["local_iface_name"] = posixpath.basename(stdout) + + def tear_down(self): + for vpci, (_, driver) in self.used_drivers.items(): + self.ssh_helper.execute(self.DPDK_UNBIND_CMD.format(dpdk_nic_bind=self.dpdk_nic_bind, + driver=driver, + vpci=vpci)) + + +class ResourceHelper(object): + + COLLECT_KPI = '' + MAKE_INSTALL = 'cd {0} && make && sudo make install' + RESOURCE_WORD = 'sample' + + COLLECT_MAP = {} + + def __init__(self, setup_helper): + super(ResourceHelper, self).__init__() + self.resource = None + self.setup_helper = setup_helper + self.ssh_helper = setup_helper.ssh_helper + + def setup(self): + self.resource = self.setup_helper.setup_vnf_environment() + + def generate_cfg(self): + pass + + def _collect_resource_kpi(self): + result = {} + status = self.resource.check_if_sa_running("collectd")[0] + if status: + result = self.resource.amqp_collect_nfvi_kpi() + + result = {"core": result} + return result + + def start_collect(self): + self.resource.initiate_systemagent(self.ssh_helper.bin_path) + self.resource.start() + self.resource.amqp_process_for_nfvi_kpi() + + def stop_collect(self): + if self.resource: + self.resource.stop() + + def collect_kpi(self): + return self._collect_resource_kpi() + + +class ClientResourceHelper(ResourceHelper): + + RUN_DURATION = 60 + QUEUE_WAIT_TIME = 5 + SYNC_PORT = 1 + ASYNC_PORT = 2 + + def __init__(self, setup_helper): + super(ClientResourceHelper, self).__init__(setup_helper) + self.vnfd_helper = setup_helper.vnfd_helper + self.scenario_helper = setup_helper.scenario_helper + + self.client = None + self.client_started = Value('i', 0) + self.my_ports = None + self._queue = Queue() + self._result = {} + self._terminated = Value('i', 0) + self._vpci_ascending = None + + def _build_ports(self): + self.my_ports = [0, 1] + + def get_stats(self, *args, **kwargs): + try: + return self.client.get_stats(*args, **kwargs) + except STLStateError: + LOG.exception("TRex client not connected") + return {} + + def generate_samples(self, key=None, default=None): + last_result = self.get_stats(self.my_ports) + key_value = last_result.get(key, default) + + if not isinstance(last_result, Mapping): # added for mock unit test + self._terminated.value = 1 + return {} + + samples = {} + for vpci_idx, vpci in enumerate(self._vpci_ascending): + name = self.vnfd_helper.find_virtual_interface(vpci=vpci)["name"] + # fixme: VNFDs KPIs values needs to be mapped to TRex structure + xe_value = last_result.get(vpci_idx, {}) + samples[name] = { + "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)), + "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)), + "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)), + "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)), + "in_packets": int(xe_value.get("ipackets", 0)), + "out_packets": int(xe_value.get("opackets", 0)), + } + if key: + samples[name][key] = key_value + return samples + + def _run_traffic_once(self, traffic_profile): + traffic_profile.execute(self) + self.client_started.value = 1 + time.sleep(self.RUN_DURATION) + samples = self.generate_samples() + time.sleep(self.QUEUE_WAIT_TIME) + self._queue.put(samples) + + def run_traffic(self, traffic_profile): + # fixme: fix passing correct trex config file, + # instead of searching the default path + self._build_ports() + self.client = self._connect() + self.client.reset(ports=self.my_ports) + self.client.remove_all_streams(self.my_ports) # remove all streams + traffic_profile.register_generator(self) + + while self._terminated.value == 0: + self._run_traffic_once(traffic_profile) + + self.client.stop(self.my_ports) + self.client.disconnect() + self._terminated.value = 0 + + def terminate(self): + self._terminated.value = 1 # stop client + + def clear_stats(self, ports=None): + if ports is None: + ports = self.my_ports + self.client.clear_stats(ports=ports) + + def start(self, ports=None, *args, **kwargs): + if ports is None: + ports = self.my_ports + self.client.start(ports=ports, *args, **kwargs) + + def collect_kpi(self): + if not self._queue.empty(): + kpi = self._queue.get() + self._result.update(kpi) + LOG.debug("Collect {0} KPIs {1}".format(self.RESOURCE_WORD, self._result)) + return self._result + + def _connect(self, client=None): + if client is None: + client = STLClient(username=self.vnfd_helper.mgmt_interface["user"], + server=self.vnfd_helper.mgmt_interface["ip"], + verbose_level=LoggerApi.VERBOSE_QUIET) + + # try to connect with 5s intervals, 30s max + for idx in range(6): + try: + client.connect() + break + except STLError: + LOG.info("Unable to connect to Trex Server.. Attempt %s", idx) + time.sleep(5) + return client + + +class Rfc2544ResourceHelper(object): + + DEFAULT_CORRELATED_TRAFFIC = False + DEFAULT_LATENCY = False + DEFAULT_TOLERANCE = '0.0001 - 0.0001' + + def __init__(self, scenario_helper): + super(Rfc2544ResourceHelper, self).__init__() + self.scenario_helper = scenario_helper + self._correlated_traffic = None + self.iteration = Value('i', 0) + self._latency = None + self._rfc2544 = None + self._tolerance_low = None + self._tolerance_high = None + + @property + def rfc2544(self): + if self._rfc2544 is None: + self._rfc2544 = self.scenario_helper.all_options['rfc2544'] + return self._rfc2544 + + @property + def tolerance_low(self): + if self._tolerance_low is None: + self.get_rfc_tolerance() + return self._tolerance_low + + @property + def tolerance_high(self): + if self._tolerance_high is None: + self.get_rfc_tolerance() + return self._tolerance_high + + @property + def correlated_traffic(self): + if self._correlated_traffic is None: + self._correlated_traffic = \ + self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC) + + return self._correlated_traffic + + @property + def latency(self): + if self._latency is None: + self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY) + return self._latency + + def get_rfc2544(self, name, default=None): + return self.rfc2544.get(name, default) + + def get_rfc_tolerance(self): + tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE) + tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-'))) + self._tolerance_low = next(tolerance_iter) + self._tolerance_high = next(tolerance_iter, self.tolerance_low) + + +class SampleVNFDeployHelper(object): + + SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf' + REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO) + SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME) + + def __init__(self, vnfd_helper, ssh_helper): + super(SampleVNFDeployHelper, self).__init__() + self.ssh_helper = ssh_helper + self.vnfd_helper = vnfd_helper + + DISABLE_DEPLOY = True + + def deploy_vnfs(self, app_name): + # temp disable for now + if self.DISABLE_DEPLOY: + return + + vnf_bin = self.ssh_helper.join_bin_path(app_name) + exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0] + if not exit_status: + return + + subprocess.check_output(["rm", "-rf", self.REPO_NAME]) + subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO]) + time.sleep(2) + self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR) + self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True) + + build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh') + time.sleep(2) + http_proxy = os.environ.get('http_proxy', '') + https_proxy = os.environ.get('https_proxy', '') + cmd = "sudo -E %s --silent '%s' '%s'" % (build_script, http_proxy, https_proxy) + LOG.debug(cmd) + self.ssh_helper.execute(cmd) + vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name) + self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path) + self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin)) + + +class ScenarioHelper(object): + + DEFAULT_VNF_CFG = { + 'lb_config': 'SW', + 'lb_count': 1, + 'worker_config': '1C/1T', + 'worker_threads': 1, + } + + def __init__(self, name): + self.name = name + self.scenario_cfg = None + + @property + def task_path(self): + return self.scenario_cfg["task_path"] + + @property + def nodes(self): + return self.scenario_cfg['nodes'] + + @property + def all_options(self): + return self.scenario_cfg["options"] + + @property + def options(self): + return self.all_options[self.name] + + @property + def vnf_cfg(self): + return self.options.get('vnf_config', self.DEFAULT_VNF_CFG) + + @property + def topology(self): + return self.scenario_cfg['topology'] + + +class SampleVNF(GenericVNF): + """ Class providing file-like API for generic VNF implementation """ + + VNF_PROMPT = "pipeline>" + WAIT_TIME = 1 + + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + super(SampleVNF, self).__init__(name, vnfd) + self.bin_path = get_nsb_option('bin_path', '') + + self.scenario_helper = ScenarioHelper(self.name) + self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path) + + if setup_env_helper_type is None: + setup_env_helper_type = SetupEnvHelper + + self.setup_helper = setup_env_helper_type(self.vnfd_helper, + self.ssh_helper, + self.scenario_helper) + + self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper) + + if resource_helper_type is None: + resource_helper_type = ResourceHelper + + self.resource_helper = resource_helper_type(self.setup_helper) + + self.all_ports = None + self.context_cfg = None + self.nfvi_context = None + self.pipeline_kwargs = {} + self.priv_ports = None + self.pub_ports = None + # TODO(esm): make QueueFileWrapper invert-able so that we + # never have to manage the queues + self.q_in = Queue() + self.q_out = Queue() + self.queue_wrapper = None + self.run_kwargs = {} + self.scenario_cfg = None + self.tg_port_pairs = None + self.used_drivers = {} + self.vnf_port_pairs = None + self._vnf_process = None + + def _get_route_data(self, route_index, route_type): + route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', [])) + for _ in range(route_index): + next(route_iter, '') + return next(route_iter, {}).get(route_type, '') + + def _get_port0localip6(self): + return_value = self._get_route_data(0, 'network') + LOG.info("_get_port0localip6 : %s", return_value) + return return_value + + def _get_port1localip6(self): + return_value = self._get_route_data(1, 'network') + LOG.info("_get_port1localip6 : %s", return_value) + return return_value + + def _get_port0prefixlen6(self): + return_value = self._get_route_data(0, 'netmask') + LOG.info("_get_port0prefixlen6 : %s", return_value) + return return_value + + def _get_port1prefixlen6(self): + return_value = self._get_route_data(1, 'netmask') + LOG.info("_get_port1prefixlen6 : %s", return_value) + return return_value + + def _get_port0gateway6(self): + return_value = self._get_route_data(0, 'network') + LOG.info("_get_port0gateway6 : %s", return_value) + return return_value + + def _get_port1gateway6(self): + return_value = self._get_route_data(1, 'network') + LOG.info("_get_port1gateway6 : %s", return_value) + return return_value + + def _start_vnf(self): + self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT) + self._vnf_process = Process(target=self._run) + self._vnf_process.start() + + def _vnf_up_post(self): + pass + + def instantiate(self, scenario_cfg, context_cfg): + self.scenario_helper.scenario_cfg = scenario_cfg + self.context_cfg = context_cfg + self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name]) + # self.nfvi_context = None + + self.deploy_helper.deploy_vnfs(self.APP_NAME) + self.resource_helper.setup() + self._start_vnf() + + def wait_for_instantiate(self): + buf = [] + time.sleep(self.WAIT_TIME) # Give some time for config to load + while True: + if not self._vnf_process.is_alive(): + raise RuntimeError("%s VNF process died." % self.APP_NAME) + + # TODO(esm): move to QueueFileWrapper + while self.q_out.qsize() > 0: + buf.append(self.q_out.get()) + message = ''.join(buf) + if self.VNF_PROMPT in message: + LOG.info("%s VNF is up and running.", self.APP_NAME) + self._vnf_up_post() + self.queue_wrapper.clear() + self.resource_helper.start_collect() + return self._vnf_process.exitcode + + if "PANIC" in message: + raise RuntimeError("Error starting %s VNF." % + self.APP_NAME) + + LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME) + time.sleep(1) + + def _build_run_kwargs(self): + self.run_kwargs = { + 'stdin': self.queue_wrapper, + 'stdout': self.queue_wrapper, + 'keep_stdin_open': True, + 'pty': True, + } + + def _build_config(self): + return self.setup_helper.build_config() + + def _run(self): + # we can't share ssh paramiko objects to force new connection + self.ssh_helper.drop_connection() + cmd = self._build_config() + # kill before starting + self.ssh_helper.execute("pkill {}".format(self.APP_NAME)) + + LOG.debug(cmd) + self._build_run_kwargs() + self.ssh_helper.run(cmd, **self.run_kwargs) + + def vnf_execute(self, cmd, wait_time=2): + """ send cmd to vnf process """ + + LOG.info("%s command: %s", self.APP_NAME, cmd) + self.q_in.put("{}\r\n".format(cmd)) + time.sleep(wait_time) + output = [] + while self.q_out.qsize() > 0: + output.append(self.q_out.get()) + return "".join(output) + + def _tear_down(self): + pass + + def terminate(self): + self.vnf_execute("quit") + if self._vnf_process: + self._vnf_process.terminate() + self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME) + self._tear_down() + self.resource_helper.stop_collect() + + def get_stats(self, *args, **kwargs): + """ + Method for checking the statistics + + :return: + VNF statistics + """ + cmd = 'p {0} stats'.format(self.APP_WORD) + out = self.vnf_execute(cmd) + return out + + def collect_kpi(self): + stats = self.get_stats() + m = re.search(self.COLLECT_KPI, stats, re.MULTILINE) + if m: + result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()} + result["collect_stats"] = self.resource_helper.collect_kpi() + else: + result = { + "packets_in": 0, + "packets_fwd": 0, + "packets_dropped": 0, + } + LOG.debug("%s collect KPIs %s", self.APP_NAME, result) + return result + + +class SampleVNFTrafficGen(GenericTrafficGen): + """ Class providing file-like API for generic traffic generator """ + + APP_NAME = 'Sample' + RUN_WAIT = 1 + + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + super(SampleVNFTrafficGen, self).__init__(name, vnfd) + self.bin_path = get_nsb_option('bin_path', '') + self.name = "tgen__1" # name in topology file + + self.scenario_helper = ScenarioHelper(self.name) + self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True) + + if setup_env_helper_type is None: + setup_env_helper_type = SetupEnvHelper + + self.setup_helper = setup_env_helper_type(self.vnfd_helper, + self.ssh_helper, + self.scenario_helper) + + if resource_helper_type is None: + resource_helper_type = ClientResourceHelper + + self.resource_helper = resource_helper_type(self.setup_helper) + + self.runs_traffic = True + self.traffic_finished = False + self.tg_port_pairs = None + self._tg_process = None + self._traffic_process = None + + def _start_server(self): + # we can't share ssh paramiko objects to force new connection + self.ssh_helper.drop_connection() + + def instantiate(self, scenario_cfg, context_cfg): + self.scenario_helper.scenario_cfg = scenario_cfg + self.resource_helper.generate_cfg() + self.setup_helper.setup_vnf_environment() + self.resource_helper.setup() + + LOG.info("Starting %s server...", self.APP_NAME) + self._tg_process = Process(target=self._start_server) + self._tg_process.start() + + def wait_for_instantiate(self): + return self._wait_for_process() + + def _check_status(self): + raise NotImplementedError + + def _wait_for_process(self): + while True: + if not self._tg_process.is_alive(): + raise RuntimeError("%s traffic generator process died." % self.APP_NAME) + LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME) + time.sleep(1) + status = self._check_status() + if status == 0: + LOG.info("%s TG Server is up and running.", self.APP_NAME) + return self._tg_process.exitcode + + def _traffic_runner(self, traffic_profile): + LOG.info("Starting %s client...", self.APP_NAME) + self.resource_helper.run_traffic(traffic_profile) + + def run_traffic(self, traffic_profile): + """ Generate traffic on the wire according to the given params. + Method is non-blocking, returns immediately when traffic process + is running. Mandatory. + + :param traffic_profile: + :return: True/False + """ + self._traffic_process = Process(target=self._traffic_runner, + args=(traffic_profile,)) + self._traffic_process.start() + # Wait for traffic process to start + while self.resource_helper.client_started.value == 0: + time.sleep(self.RUN_WAIT) + + return self._traffic_process.is_alive() + + def listen_traffic(self, traffic_profile): + """ Listen to traffic with the given parameters. + Method is non-blocking, returns immediately when traffic process + is running. Optional. + + :param traffic_profile: + :return: True/False + """ + pass + + def verify_traffic(self, traffic_profile): + """ Verify captured traffic after it has ended. Optional. + + :param traffic_profile: + :return: dict + """ + pass + + def collect_kpi(self): + result = self.resource_helper.collect_kpi() + LOG.debug("%s collect KPIs %s", self.APP_NAME, result) + return result + + def terminate(self): + """ After this method finishes, all traffic processes should stop. Mandatory. + + :return: True/False + """ + self.traffic_finished = True + if self._traffic_process is not None: + self._traffic_process.terminate() diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ping.py b/yardstick/network_services/vnf_generic/vnf/tg_ping.py index 000a91db4..e65296287 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_ping.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_ping.py @@ -16,14 +16,13 @@ from __future__ import absolute_import from __future__ import print_function import logging -import multiprocessing import re -import time -import os -from yardstick import ssh -from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen -from yardstick.network_services.utils import provision_tool +from multiprocessing import Queue +from ipaddress import IPv4Interface + +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen +from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper LOG = logging.getLogger(__name__) @@ -42,77 +41,59 @@ class PingParser(object): if match: # IMPORTANT: in order for the data to be properly taken # in by InfluxDB, it needs to be converted to numeric types - self.queue.put({"packets_received": float(match.group(1)), - "rtt": float(match.group(2))}) + self.queue.put({ + "packets_received": float(match.group(1)), + "rtt": float(match.group(2)), + }) def close(self): - ''' close the ssh connection ''' - pass + """ close the ssh connection """ + self.closed = True def clear(self): - ''' clear queue till Empty ''' + """ clear queue till Empty """ while self.queue.qsize() > 0: self.queue.get() -class PingTrafficGen(GenericTrafficGen): +class PingSetupEnvHelper(DpdkVnfSetupEnvHelper): + + def setup_vnf_environment(self): + self._bind_kernel_devices() + + +class PingTrafficGen(SampleVNFTrafficGen): """ This traffic generator can ping a single IP with pingsize and target given in traffic profile """ - def __init__(self, vnfd): - super(PingTrafficGen, self).__init__(vnfd) + TG_NAME = 'Ping' + RUN_WAIT = 4 + + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + if setup_env_helper_type is None: + setup_env_helper_type = PingSetupEnvHelper + + super(PingTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) + self._queue = Queue() + self._parser = PingParser(self._queue) self._result = {} - self._parser = None - self._queue = None - self._traffic_process = None - - mgmt_interface = vnfd["mgmt-interface"] - self.connection = ssh.SSH.from_node(mgmt_interface) - self.connection.wait() - - def _bind_device_kernel(self, connection): - dpdk_nic_bind = \ - provision_tool(self.connection, - os.path.join(self.bin_path, "dpdk_nic_bind.py")) - - drivers = {intf["virtual-interface"]["vpci"]: - intf["virtual-interface"]["driver"] - for intf in self.vnfd["vdu"][0]["external-interface"]} - - commands = \ - ['"{0}" --force -b "{1}" "{2}"'.format(dpdk_nic_bind, value, key) - for key, value in drivers.items()] - for command in commands: - connection.execute(command) - - for index, out in enumerate(self.vnfd["vdu"][0]["external-interface"]): - vpci = out["virtual-interface"]["vpci"] - net = "find /sys/class/net -lname '*{}*' -printf '%f'".format(vpci) - out = connection.execute(net)[1] - ifname = out.split('/')[-1].strip('\n') - self.vnfd["vdu"][0]["external-interface"][index][ - "virtual-interface"]["local_iface_name"] = ifname def scale(self, flavor=""): - ''' scale vnfbased on flavor input ''' - super(PingTrafficGen, self).scale(flavor) + """ scale vnf-based on flavor input """ + pass - def instantiate(self, scenario_cfg, context_cfg): - self._result = {"packets_received": 0, "rtt": 0} - self._bind_device_kernel(self.connection) + def _check_status(self): + return self._tg_process.is_alive() - def run_traffic(self, traffic_profile): - self._queue = multiprocessing.Queue() - self._parser = PingParser(self._queue) - self._traffic_process = \ - multiprocessing.Process(target=self._traffic_runner, - args=(traffic_profile, self._parser)) - self._traffic_process.start() - # Wait for traffic process to start - time.sleep(4) - return self._traffic_process.is_alive() + def instantiate(self, scenario_cfg, context_cfg): + self._result = { + "packets_received": 0, + "rtt": 0, + } + self.setup_helper.setup_vnf_environment() def listen_traffic(self, traffic_profile): """ Not needed for ping @@ -122,38 +103,26 @@ class PingTrafficGen(GenericTrafficGen): """ pass - def _traffic_runner(self, traffic_profile, filewrapper): - - mgmt_interface = self.vnfd["mgmt-interface"] - self.connection = ssh.SSH.from_node(mgmt_interface) - self.connection.wait() - external_interface = self.vnfd["vdu"][0]["external-interface"] - virtual_interface = external_interface[0]["virtual-interface"] - target_ip = virtual_interface["dst_ip"].split('/')[0] - local_ip = virtual_interface["local_ip"].split('/')[0] - local_if_name = \ - virtual_interface["local_iface_name"].split('/')[0] - packet_size = traffic_profile.params["traffic_profile"]["frame_size"] - - run_cmd = [] - - run_cmd.append("ip addr flush %s" % local_if_name) - run_cmd.append("ip addr add %s/24 dev %s" % (local_ip, local_if_name)) - run_cmd.append("ip link set %s up" % local_if_name) - - for cmd in run_cmd: - self.connection.execute(cmd) - - ping_cmd = ("ping -s %s %s" % (packet_size, target_ip)) - self.connection.run(ping_cmd, stdout=filewrapper, + def _traffic_runner(self, traffic_profile): + intf = self.vnfd_helper.interfaces[0]["virtual-interface"] + profile = traffic_profile.params["traffic_profile"] + cmd_kwargs = { + 'target_ip': IPv4Interface(intf["dst_ip"]).ip.exploded, + 'local_ip': IPv4Interface(intf["local_ip"]).ip.exploded, + 'local_if_name': intf["local_iface_name"].split('/')[0], + 'packet_size': profile["frame_size"], + } + + cmd_list = [ + "sudo ip addr flush {local_if_name}", + "sudo ip addr add {local_ip}/24 dev {local_if_name}", + "sudo ip link set {local_if_name} up", + ] + + for cmd in cmd_list: + self.ssh_helper.execute(cmd.format(**cmd_kwargs)) + + ping_cmd = "ping -s {packet_size} {target_ip}" + self.ssh_helper.run(ping_cmd.format(**cmd_kwargs), + stdout=self._parser, keep_stdin_open=True, pty=True) - - def collect_kpi(self): - if not self._queue.empty(): - kpi = self._queue.get() - self._result.update(kpi) - return self._result - - def terminate(self): - if self._traffic_process is not None: - self._traffic_process.terminate() diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py index 7da4b31e9..79e42e0a8 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py @@ -15,267 +15,98 @@ from __future__ import absolute_import from __future__ import print_function -import multiprocessing import time import logging -import os -import yaml +from collections import Mapping +from itertools import chain -from yardstick import ssh -from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen -from yardstick.network_services.utils import get_nsb_option -from stl.trex_stl_lib.trex_stl_client import STLClient -from stl.trex_stl_lib.trex_stl_client import LoggerApi -from stl.trex_stl_lib.trex_stl_exceptions import STLError +from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig +from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexTrafficGen +from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper +from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexResourceHelper LOGGING = logging.getLogger(__name__) -DURATION = 30 -WAIT_TIME = 3 -TREX_SYNC_PORT = 4500 -TREX_ASYNC_PORT = 4501 +class TrexRfc2544ResourceHelper(Rfc2544ResourceHelper): -class TrexTrafficGenRFC(GenericTrafficGen): - """ - This class handles mapping traffic profile and generating - traffic for rfc2544 testcase. - """ - - def __init__(self, vnfd): - super(TrexTrafficGenRFC, self).__init__(vnfd) - self._result = {} - self._terminated = multiprocessing.Value('i', 0) - self._queue = multiprocessing.Queue() - self._terminated = multiprocessing.Value('i', 0) - self._traffic_process = None - self._vpci_ascending = None - self.tc_file_name = None - self.client = None - self.my_ports = None - - mgmt_interface = self.vnfd["mgmt-interface"] - - self.connection = ssh.SSH.from_node(mgmt_interface) - self.connection.wait() - - @classmethod - def _split_mac_address_into_list(cls, mac): - octets = mac.split(':') - for i, elem in enumerate(octets): - octets[i] = "0x" + str(elem) - return octets - - def _generate_trex_cfg(self, vnfd): - """ - - :param vnfd: vnfd.yaml - :return: trex_cfg.yaml file - """ - trex_cfg = dict( - port_limit=0, - version='2', - interfaces=[], - port_info=list(dict( - )) - ) - trex_cfg["port_limit"] = len(vnfd["vdu"][0]["external-interface"]) - trex_cfg["version"] = '2' - - cfg_file = [] - vpci = [] - port = {} - - ext_intf = vnfd["vdu"][0]["external-interface"] - for interface in ext_intf: - virt_intf = interface["virtual-interface"] - vpci.append(virt_intf["vpci"]) - - port["src_mac"] = \ - self._split_mac_address_into_list(virt_intf["local_mac"]) - - time.sleep(WAIT_TIME) - port["dest_mac"] = \ - self._split_mac_address_into_list(virt_intf["dst_mac"]) - if virt_intf["dst_mac"]: - trex_cfg["port_info"].append(port.copy()) - - trex_cfg["interfaces"] = vpci - cfg_file.append(trex_cfg) - - with open('/tmp/trex_cfg.yaml', 'w') as outfile: - outfile.write(yaml.safe_dump(cfg_file, default_flow_style=False)) - self.connection.put('/tmp/trex_cfg.yaml', '/etc') - - self._vpci_ascending = sorted(vpci) - - def scale(self, flavor=""): - ''' scale vnfbased on flavor input ''' - super(TrexTrafficGenRFC, self).scale(flavor) - - def instantiate(self, scenario_cfg, context_cfg): - self._generate_trex_cfg(self.vnfd) - self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc']) - trex = os.path.join(self.bin_path, "trex") - err, _, _ = \ - self.connection.execute("ls {} >/dev/null 2>&1".format(trex)) - if err != 0: - self.connection.put(trex, trex, True) + def is_done(self): + return self.latency and self.iteration.value > 10 - LOGGING.debug("Starting TRex server...") - _tg_server = \ - multiprocessing.Process(target=self._start_server) - _tg_server.start() - while True: - LOGGING.info("Waiting for TG Server to start.. ") - time.sleep(WAIT_TIME) - status = \ - self.connection.execute("lsof -i:%s" % TREX_SYNC_PORT)[0] - if status == 0: - LOGGING.info("TG server is up and running.") - return _tg_server.exitcode - if not _tg_server.is_alive(): - raise RuntimeError("Traffic Generator process died.") +class TrexRfcResourceHelper(TrexResourceHelper): - def listen_traffic(self, traffic_profile): - pass + LATENCY_TIME_SLEEP = 120 + RUN_DURATION = 30 + WAIT_TIME = 3 - def _get_logical_if_name(self, vpci): - ext_intf = self.vnfd["vdu"][0]["external-interface"] - for interface in range(len(self.vnfd["vdu"][0]["external-interface"])): - virtual_intf = ext_intf[interface]["virtual-interface"] - if virtual_intf["vpci"] == vpci: - return ext_intf[interface]["name"] + def __init__(self, setup_helper, rfc_helper_type=None): + super(TrexRfcResourceHelper, self).__init__(setup_helper) - def run_traffic(self, traffic_profile, - client_started=multiprocessing.Value('i', 0)): + if rfc_helper_type is None: + rfc_helper_type = TrexRfc2544ResourceHelper - self._traffic_process = \ - multiprocessing.Process(target=self._traffic_runner, - args=(traffic_profile, self._queue, - client_started, self._terminated)) - self._traffic_process.start() - # Wait for traffic process to start - while client_started.value == 0: - time.sleep(1) + self.rfc2544_helper = rfc_helper_type(self.scenario_helper) + # self.tg_port_pairs = [] - return self._traffic_process.is_alive() + def _build_ports(self): + self.tg_port_pairs, self.networks = MultiPortConfig.get_port_pairs( + self.vnfd_helper.interfaces) + self.priv_ports = [int(x[0][-1]) for x in self.tg_port_pairs] + self.pub_ports = [int(x[1][-1]) for x in self.tg_port_pairs] + self.my_ports = list(set(chain(self.priv_ports, self.pub_ports))) - def _start_server(self): - mgmt_interface = self.vnfd["mgmt-interface"] - - _server = ssh.SSH.from_node(mgmt_interface) - _server.wait() - - _server.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" % - (TREX_SYNC_PORT, TREX_ASYNC_PORT)) - _server.execute("pkill -9 rex > /dev/null 2>&1") - - trex_path = os.path.join(self.bin_path, "trex/scripts") - path = get_nsb_option("trex_path", trex_path) - trex_cmd = "cd " + path + "; sudo ./t-rex-64 -i > /dev/null 2>&1" - - _server.execute(trex_cmd) - - def _connect_client(self, client=None): - if client is None: - client = STLClient(username=self.vnfd["mgmt-interface"]["user"], - server=self.vnfd["mgmt-interface"]["ip"], - verbose_level=LoggerApi.VERBOSE_QUIET) - for idx in range(6): - try: - client.connect() - break - except STLError: - LOGGING.info("Unable to connect to Trex. Attempt %s", idx) - time.sleep(WAIT_TIME) - return client - - @classmethod - def _get_rfc_tolerance(cls, tc_yaml): - tolerance = '0.8 - 1.0' - if 'tc_options' in tc_yaml['scenarios'][0]: - tc_options = tc_yaml['scenarios'][0]['tc_options'] - if 'rfc2544' in tc_options: - tolerance = \ - tc_options['rfc2544'].get('allowed_drop_rate', '0.8 - 1.0') - - tolerance = tolerance.split('-') - min_tol = float(tolerance[0]) - if len(tolerance) == 2: - max_tol = float(tolerance[1]) - else: - max_tol = float(tolerance[0]) - - return [min_tol, max_tol] - - def _traffic_runner(self, traffic_profile, queue, - client_started, terminated): - LOGGING.info("Starting TRex client...") - tc_yaml = {} - - with open(self.tc_file_name) as tc_file: - tc_yaml = yaml.load(tc_file.read()) + def _run_traffic_once(self, traffic_profile): + traffic_profile.execute(self) + self.client_started.value = 1 + time.sleep(self.RUN_DURATION) + self.client.stop(self.my_ports) + time.sleep(self.WAIT_TIME) + samples = traffic_profile.get_drop_percentage(self) + self._queue.put(samples) - tolerance = self._get_rfc_tolerance(tc_yaml) + if not self.rfc2544_helper.is_done(): + return - # fixme: fix passing correct trex config file, - # instead of searching the default path - self.my_ports = [0, 1] - self.client = self._connect_client() + self.client.stop(self.my_ports) self.client.reset(ports=self.my_ports) - self.client.remove_all_streams(self.my_ports) # remove all streams - while not terminated.value: - traffic_profile.execute(self) - client_started.value = 1 - time.sleep(DURATION) + self.client.remove_all_streams(self.my_ports) + traffic_profile.execute_latency(samples=samples) + multiplier = traffic_profile.calculate_pps(samples)[1] + for _ in range(5): + time.sleep(self.LATENCY_TIME_SLEEP) self.client.stop(self.my_ports) - time.sleep(WAIT_TIME) + time.sleep(self.WAIT_TIME) last_res = self.client.get_stats(self.my_ports) - samples = {} - for vpci_idx in range(len(self._vpci_ascending)): - name = \ - self._get_logical_if_name(self._vpci_ascending[vpci_idx]) - # fixme: VNFDs KPIs values needs to be mapped to TRex structure - if not isinstance(last_res, dict): - terminated.value = 1 - last_res = {} + if not isinstance(last_res, Mapping): + self._terminated.value = 1 + continue + self.generate_samples('latency', {}) + self._queue.put(samples) + self.client.start(mult=str(multiplier), + ports=self.my_ports, + duration=120, force=True) - samples[name] = \ - {"rx_throughput_fps": - float(last_res.get(vpci_idx, {}).get("rx_pps", 0.0)), - "tx_throughput_fps": - float(last_res.get(vpci_idx, {}).get("tx_pps", 0.0)), - "rx_throughput_mbps": - float(last_res.get(vpci_idx, {}).get("rx_bps", 0.0)), - "tx_throughput_mbps": - float(last_res.get(vpci_idx, {}).get("tx_bps", 0.0)), - "in_packets": - last_res.get(vpci_idx, {}).get("ipackets", 0), - "out_packets": - last_res.get(vpci_idx, {}).get("opackets", 0)} + def start_client(self, mult, duration, force=True): + self.client.start(ports=self.my_ports, mult=mult, duration=duration, force=force) - samples = \ - traffic_profile.get_drop_percentage(self, samples, - tolerance[0], tolerance[1]) - queue.put(samples) - self.client.stop(self.my_ports) - self.client.disconnect() - queue.put(samples) + def clear_client_stats(self): + self.client.clear_stats(ports=self.my_ports) def collect_kpi(self): - if not self._queue.empty(): - result = self._queue.get() - self._result.update(result) - LOGGING.debug("trex collect Kpis %s", self._result) - return self._result + self.rfc2544_helper.iteration.value += 1 + super(TrexRfcResourceHelper, self).collect_kpi() + - def terminate(self): - self._terminated.value = 1 # stop Trex clinet +class TrexTrafficGenRFC(TrexTrafficGen): + """ + This class handles mapping traffic profile and generating + traffic for rfc2544 testcase. + """ - self.connection.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" % - (TREX_SYNC_PORT, TREX_ASYNC_PORT)) + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + if resource_helper_type is None: + resource_helper_type = TrexRfcResourceHelper - if self._traffic_process: - self._traffic_process.terminate() + super(TrexTrafficGenRFC, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) diff --git a/yardstick/network_services/vnf_generic/vnf/tg_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_trex.py index 058b715fe..616b331ba 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_trex.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_trex.py @@ -15,261 +15,136 @@ from __future__ import absolute_import from __future__ import print_function -import multiprocessing -import time + import logging import os + import yaml -from yardstick import ssh -from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen +from yardstick.common.utils import mac_address_to_hex_list from yardstick.network_services.utils import get_nsb_option -from yardstick.network_services.utils import provision_tool -from stl.trex_stl_lib.trex_stl_client import STLClient -from stl.trex_stl_lib.trex_stl_client import LoggerApi -from stl.trex_stl_lib.trex_stl_exceptions import STLError +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen +from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper LOG = logging.getLogger(__name__) -DURATION = 30 -WAIT_QUEUE = 1 -TREX_SYNC_PORT = 4500 -TREX_ASYNC_PORT = 4501 -class TrexTrafficGen(GenericTrafficGen): +class TrexResourceHelper(ClientResourceHelper): + + CONF_FILE = '/tmp/trex_cfg.yaml' + QUEUE_WAIT_TIME = 1 + RESOURCE_WORD = 'trex' + RUN_DURATION = 0 + + SYNC_PORT = 4500 + ASYNC_PORT = 4501 + + def generate_cfg(self): + ext_intf = self.vnfd_helper.interfaces + vpci_list = [] + port_list = [] + trex_cfg = { + 'port_limit': 0, + 'version': '2', + 'interfaces': vpci_list, + 'port_info': port_list, + "port_limit": len(ext_intf), + "version": '2', + } + cfg_file = [trex_cfg] + + for interface in ext_intf: + virtual_interface = interface['virtual-interface'] + vpci_list.append(virtual_interface["vpci"]) + dst_mac = virtual_interface["dst_mac"] + + if not dst_mac: + continue + + local_mac = virtual_interface["local_mac"] + port_list.append({ + "src_mac": mac_address_to_hex_list(local_mac), + "dest_mac": mac_address_to_hex_list(dst_mac), + }) + + cfg_str = yaml.safe_dump(cfg_file, default_flow_style=False, explicit_start=True) + self.ssh_helper.upload_config_file(os.path.basename(self.CONF_FILE), cfg_str) + self._vpci_ascending = sorted(vpci_list) + + def check_status(self): + status, _, _ = self.ssh_helper.execute("sudo lsof -i:%s" % self.SYNC_PORT) + return status + + # temp disable + DISABLE_DEPLOY = True + + def setup(self): + if self.DISABLE_DEPLOY: + return + + trex_path = self.ssh_helper.join_bin_path('trex') + + err = self.ssh_helper.execute("which {}".format(trex_path))[0] + if err == 0: + return + + LOG.info("Copying %s to destination...", self.RESOURCE_WORD) + self.ssh_helper.run("sudo mkdir -p '{}'".format(os.path.dirname(trex_path))) + self.ssh_helper.put("~/.bash_profile", "~/.bash_profile") + self.ssh_helper.put(trex_path, trex_path, True) + ko_src = os.path.join(trex_path, "scripts/ko/src/") + self.ssh_helper.execute(self.MAKE_INSTALL.format(ko_src)) + + def start(self, ports=None, *args, **kwargs): + cmd = "sudo fuser -n tcp {0.SYNC_PORT} {0.ASYNC_PORT} -k > /dev/null 2>&1" + self.ssh_helper.execute(cmd.format(self)) + + self.ssh_helper.execute("sudo pkill -9 rex > /dev/null 2>&1") + + trex_path = self.ssh_helper.join_bin_path("trex", "scripts") + path = get_nsb_option("trex_path", trex_path) + + # cmd = "sudo ./t-rex-64 -i --cfg %s > /dev/null 2>&1" % self.CONF_FILE + cmd = "./t-rex-64 -i --cfg '{}'".format(self.CONF_FILE) + + # if there are errors we want to see them + # we have to sudo cd because the path might be owned by root + trex_cmd = """sudo bash -c "cd '{}' ; {}" >/dev/null""".format(path, cmd) + self.ssh_helper.execute(trex_cmd) + + def terminate(self): + super(TrexResourceHelper, self).terminate() + cmd = "sudo fuser -n tcp %s %s -k > /dev/null 2>&1" + self.ssh_helper.execute(cmd % (self.SYNC_PORT, self.ASYNC_PORT)) + + +class TrexTrafficGen(SampleVNFTrafficGen): """ This class handles mapping traffic profile and generating traffic for given testcase """ - def __init__(self, vnfd): - super(TrexTrafficGen, self).__init__(vnfd) - self._result = {} - self._queue = multiprocessing.Queue() - self._terminated = multiprocessing.Value('i', 0) - self._traffic_process = None - self._vpci_ascending = None - self.client = None - self.my_ports = None - self.client_started = multiprocessing.Value('i', 0) - - mgmt_interface = vnfd["mgmt-interface"] - - self.connection = ssh.SSH.from_node(mgmt_interface) - self.connection.wait() - - @classmethod - def _split_mac_address_into_list(cls, mac): - octets = mac.split(':') - for i, elem in enumerate(octets): - octets[i] = "0x" + str(elem) - return octets - - def _generate_trex_cfg(self, vnfd): - """ - - :param vnfd: vnfd.yaml - :return: trex_cfg.yaml file - """ - trex_cfg = dict( - port_limit=0, - version='2', - interfaces=[], - port_info=list(dict( - )) - ) - trex_cfg["port_limit"] = len(vnfd["vdu"][0]["external-interface"]) - trex_cfg["version"] = '2' - - cfg_file = [] - vpci = [] - port = {} - - for interface in range(len(vnfd["vdu"][0]["external-interface"])): - ext_intrf = vnfd["vdu"][0]["external-interface"] - virtual_interface = ext_intrf[interface]["virtual-interface"] - vpci.append(virtual_interface["vpci"]) - - port["src_mac"] = self._split_mac_address_into_list( - virtual_interface["local_mac"]) - port["dest_mac"] = self._split_mac_address_into_list( - virtual_interface["dst_mac"]) - - trex_cfg["port_info"].append(port.copy()) - - trex_cfg["interfaces"] = vpci - cfg_file.append(trex_cfg) - - with open('/tmp/trex_cfg.yaml', 'w') as outfile: - outfile.write(yaml.safe_dump(cfg_file, default_flow_style=False)) - self.connection.put('/tmp/trex_cfg.yaml', '/etc') - - self._vpci_ascending = sorted(vpci) - - @classmethod - def __setup_hugepages(cls, connection): - hugepages = \ - connection.execute( - "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1] - hugepages = hugepages.rstrip() - - memory_path = \ - '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages - connection.execute("awk -F: '{ print $1 }' < %s" % memory_path) - - pages = 16384 if hugepages.rstrip() == "2048kB" else 16 - connection.execute("echo %s > %s" % (pages, memory_path)) - - def setup_vnf_environment(self, connection): - ''' setup dpdk environment needed for vnf to run ''' - - self.__setup_hugepages(connection) - connection.execute("modprobe uio && modprobe igb_uio") - - exit_status = connection.execute("lsmod | grep -i igb_uio")[0] - if exit_status == 0: - return + APP_NAME = 'TRex' - dpdk = os.path.join(self.bin_path, "dpdk-16.07") - dpdk_setup = \ - provision_tool(self.connection, - os.path.join(self.bin_path, "nsb_setup.sh")) - status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0] - if status: - connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup) + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + if resource_helper_type is None: + resource_helper_type = TrexResourceHelper - def scale(self, flavor=""): - ''' scale vnfbased on flavor input ''' - super(TrexTrafficGen, self).scale(flavor) - - def instantiate(self, scenario_cfg, context_cfg): - self._generate_trex_cfg(self.vnfd) - self.setup_vnf_environment(self.connection) - - trex = os.path.join(self.bin_path, "trex") - err = \ - self.connection.execute("ls {} >/dev/null 2>&1".format(trex))[0] - if err != 0: - LOG.info("Copying trex to destination...") - self.connection.put("/root/.bash_profile", "/root/.bash_profile") - self.connection.put(trex, trex, True) - ko_src = os.path.join(trex, "scripts/ko/src/") - self.connection.execute("cd %s && make && make install" % ko_src) - - LOG.info("Starting TRex server...") - _tg_process = \ - multiprocessing.Process(target=self._start_server) - _tg_process.start() - while True: - if not _tg_process.is_alive(): - raise RuntimeError("Traffic Generator process died.") - LOG.info("Waiting for TG Server to start.. ") - time.sleep(1) - status = \ - self.connection.execute("lsof -i:%s" % TREX_SYNC_PORT)[0] - if status == 0: - LOG.info("TG server is up and running.") - return _tg_process.exitcode - - def listen_traffic(self, traffic_profile): - pass + super(TrexTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) - def _get_logical_if_name(self, vpci): - ext_intf = self.vnfd["vdu"][0]["external-interface"] - for interface in range(len(self.vnfd["vdu"][0]["external-interface"])): - virtual_intf = ext_intf[interface]["virtual-interface"] - if virtual_intf["vpci"] == vpci: - return ext_intf[interface]["name"] - - def run_traffic(self, traffic_profile): - self._traffic_process = \ - multiprocessing.Process(target=self._traffic_runner, - args=(traffic_profile, self._queue, - self.client_started, - self._terminated)) - self._traffic_process.start() - # Wait for traffic process to start - while self.client_started.value == 0: - time.sleep(1) - - return self._traffic_process.is_alive() + def _check_status(self): + return self.resource_helper.check_status() def _start_server(self): - mgmt_interface = self.vnfd["mgmt-interface"] - - _server = ssh.SSH.from_node(mgmt_interface) - _server.wait() + super(TrexTrafficGen, self)._start_server() + self.resource_helper.start() - _server.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" % - (TREX_SYNC_PORT, TREX_ASYNC_PORT)) + def scale(self, flavor=""): + pass - trex_path = os.path.join(self.bin_path, "trex/scripts") - path = get_nsb_option("trex_path", trex_path) - trex_cmd = "cd " + path + "; sudo ./t-rex-64 -i > /dev/null 2>&1" - - _server.execute(trex_cmd) - - def _connect_client(self, client=None): - if client is None: - client = STLClient(username=self.vnfd["mgmt-interface"]["user"], - server=self.vnfd["mgmt-interface"]["ip"], - verbose_level=LoggerApi.VERBOSE_QUIET) - # try to connect with 5s intervals, 30s max - for idx in range(6): - try: - client.connect() - break - except STLError: - LOG.info("Unable to connect to Trex Server.. Attempt %s", idx) - time.sleep(5) - return client - - def _traffic_runner(self, traffic_profile, queue, - client_started, terminated): - LOG.info("Starting TRex client...") - - self.my_ports = [0, 1] - self.client = self._connect_client() - self.client.reset(ports=self.my_ports) - - self.client.remove_all_streams(self.my_ports) # remove all streams - - while not terminated.value: - traffic_profile.execute(self) - client_started.value = 1 - last_res = self.client.get_stats(self.my_ports) - if not isinstance(last_res, dict): # added for mock unit test - terminated.value = 1 - last_res = {} - - samples = {} - for vpci_idx in range(len(self._vpci_ascending)): - name = \ - self._get_logical_if_name(self._vpci_ascending[vpci_idx]) - # fixme: VNFDs KPIs values needs to be mapped to TRex structure - xe_value = last_res.get(vpci_idx, {}) - samples[name] = \ - {"rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)), - "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)), - "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)), - "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)), - "in_packets": xe_value.get("ipackets", 0), - "out_packets": xe_value.get("opackets", 0)} - time.sleep(WAIT_QUEUE) - queue.put(samples) - - self.client.disconnect() - terminated.value = 0 - - def collect_kpi(self): - if not self._queue.empty(): - self._result.update(self._queue.get()) - LOG.debug("trex collect Kpis %s", self._result) - return self._result + def listen_traffic(self, traffic_profile): + pass def terminate(self): - self.connection.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" % - (TREX_SYNC_PORT, TREX_ASYNC_PORT)) - self.traffic_finished = True - if self._traffic_process: - self._traffic_process.terminate() + self.resource_helper.terminate() diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py index e9e80bdfb..310ab67cb 100644 --- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py @@ -15,313 +15,268 @@ from __future__ import absolute_import from __future__ import print_function -import tempfile -import time import os import logging import re -from multiprocessing import Queue -import multiprocessing -import ipaddress -import six +import posixpath -from yardstick import ssh -from yardstick.network_services.vnf_generic.vnf.base import GenericVNF -from yardstick.network_services.utils import provision_tool -from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper -from yardstick.network_services.nfvi.resource import ResourceProfile +from six.moves import configparser, zip -LOG = logging.getLogger(__name__) -VPE_PIPELINE_COMMAND = '{tool_path} -p 0x3 -f {cfg_file} -s {script}' -CORES = ['0', '1', '2'] -WAIT_TIME = 20 +from yardstick.network_services.pipeline import PipelineRules +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper +LOG = logging.getLogger(__name__) -class VpeApproxVnf(GenericVNF): +VPE_PIPELINE_COMMAND = """sudo {tool_path} -p {ports_len_hex} -f {cfg_file} -s {script}""" + +VPE_COLLECT_KPI = """\ +Pkts in:\s(\d+)\r\n\ +\tPkts dropped by Pkts in:\s(\d+)\r\n\ +\tPkts dropped by AH:\s(\d+)\r\n\\ +\tPkts dropped by other:\s(\d+)\ +""" + + +class ConfigCreate(object): + + @staticmethod + def vpe_tmq(config, index): + tm_q = 'TM{0}'.format(index) + config.add_section(tm_q) + config.set(tm_q, 'burst_read', '24') + config.set(tm_q, 'burst_write', '32') + config.set(tm_q, 'cfg', '/tmp/full_tm_profile_10G.cfg') + return config + + def __init__(self, priv_ports, pub_ports, socket): + super(ConfigCreate, self).__init__() + self.sw_q = -1 + self.sink_q = -1 + self.n_pipeline = 1 + self.priv_ports = priv_ports + self.pub_ports = pub_ports + self.pipeline_per_port = 9 + self.socket = socket + + def vpe_initialize(self, config): + config.add_section('EAL') + config.set('EAL', 'log_level', '0') + + config.add_section('PIPELINE0') + config.set('PIPELINE0', 'type', 'MASTER') + config.set('PIPELINE0', 'core', 's%sC0' % self.socket) + + config.add_section('MEMPOOL0') + config.set('MEMPOOL0', 'pool_size', '256K') + + config.add_section('MEMPOOL1') + config.set('MEMPOOL1', 'pool_size', '2M') + return config + + def vpe_rxq(self, config): + for port in self.pub_ports: + new_section = 'RXQ{0}.0'.format(port) + config.add_section(new_section) + config.set(new_section, 'mempool', 'MEMPOOL1') + + return config + + def get_sink_swq(self, parser, pipeline, k, index): + sink = "" + pktq = parser.get(pipeline, k) + if "SINK" in pktq: + self.sink_q += 1 + sink = " SINK{0}".format(self.sink_q) + if "TM" in pktq: + sink = " TM{0}".format(index) + pktq = "SWQ{0}{1}".format(self.sw_q, sink) + return pktq + + def vpe_upstream(self, vnf_cfg, intf): + parser = configparser.ConfigParser() + parser.read(os.path.join(vnf_cfg, 'vpe_upstream')) + for pipeline in parser.sections(): + for k, v in parser.items(pipeline): + if k == "pktq_in": + index = intf['index'] + if "RXQ" in v: + value = "RXQ{0}.0".format(index) + else: + value = self.get_sink_swq(parser, pipeline, k, index) + + parser.set(pipeline, k, value) + + elif k == "pktq_out": + index = intf['peer_intf']['index'] + if "TXQ" in v: + value = "TXQ{0}.0".format(index) + else: + self.sw_q += 1 + value = self.get_sink_swq(parser, pipeline, k, index) + + parser.set(pipeline, k, value) + + new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline) + if new_pipeline != pipeline: + parser._sections[new_pipeline] = parser._sections[pipeline] + parser._sections.pop(pipeline) + self.n_pipeline += 1 + return parser + + def vpe_downstream(self, vnf_cfg, intf): + parser = configparser.ConfigParser() + parser.read(os.path.join(vnf_cfg, 'vpe_downstream')) + for pipeline in parser.sections(): + for k, v in parser.items(pipeline): + index = intf['dpdk_port_num'] + peer_index = intf['peer_intf']['dpdk_port_num'] + + if k == "pktq_in": + if "RXQ" not in v: + value = self.get_sink_swq(parser, pipeline, k, index) + elif "TM" in v: + value = "RXQ{0}.0 TM{1}".format(peer_index, index) + else: + value = "RXQ{0}.0".format(peer_index) + + parser.set(pipeline, k, value) + + if k == "pktq_out": + if "TXQ" not in v: + self.sw_q += 1 + value = self.get_sink_swq(parser, pipeline, k, index) + elif "TM" in v: + value = "TXQ{0}.0 TM{1}".format(peer_index, index) + else: + value = "TXQ{0}.0".format(peer_index) + + parser.set(pipeline, k, value) + + new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline) + if new_pipeline != pipeline: + parser._sections[new_pipeline] = parser._sections[pipeline] + parser._sections.pop(pipeline) + self.n_pipeline += 1 + return parser + + def create_vpe_config(self, vnf_cfg): + config = configparser.ConfigParser() + vpe_cfg = os.path.join("/tmp/vpe_config") + with open(vpe_cfg, 'w') as cfg_file: + config = self.vpe_initialize(config) + config = self.vpe_rxq(config) + config.write(cfg_file) + for index, priv_port in enumerate(self.priv_ports): + config = self.vpe_upstream(vnf_cfg, priv_port) + config.write(cfg_file) + config = self.vpe_downstream(vnf_cfg, priv_port) + config = self.vpe_tmq(config, index) + config.write(cfg_file) + + def generate_vpe_script(self, interfaces): + rules = PipelineRules(pipeline_id=1) + for priv_port, pub_port in zip(self.priv_ports, self.pub_ports): + priv_intf = interfaces[priv_port]["virtual-interface"] + pub_intf = interfaces[pub_port]["virtual-interface"] + + dst_port0_ip = priv_intf["dst_ip"] + dst_port1_ip = pub_intf["dst_ip"] + dst_port0_mac = priv_intf["dst_mac"] + dst_port1_mac = pub_intf["dst_mac"] + + rules.add_firewall_script(dst_port0_ip) + rules.next_pipeline() + rules.add_flow_classification_script() + rules.next_pipeline() + rules.add_flow_action() + rules.next_pipeline() + rules.add_flow_action2() + rules.next_pipeline() + rules.add_route_script(dst_port1_ip, dst_port1_mac) + rules.next_pipeline() + rules.add_route_script2(dst_port0_ip, dst_port0_mac) + rules.next_pipeline(num=4) + + return rules.get_string() + + +class VpeApproxSetupEnvHelper(DpdkVnfSetupEnvHelper): + + CFG_CONFIG = "/tmp/vpe_config" + CFG_SCRIPT = "/tmp/vpe_script" + CORES = ['0', '1', '2', '3', '4', '5'] + PIPELINE_COMMAND = VPE_PIPELINE_COMMAND + + def build_config(self): + vpe_vars = { + "bin_path": self.ssh_helper.bin_path, + "socket": self.socket, + } + + all_ports = [] + priv_ports = [] + pub_ports = [] + for interface in self.vnfd_helper.interfaces: + all_ports.append(interface['name']) + vld_id = interface['virtual-interface']['vld_id'] + if vld_id.startswith('private'): + priv_ports.append(interface) + elif vld_id.startswith('public'): + pub_ports.append(interface) + + vpe_conf = ConfigCreate(priv_ports, pub_ports, self.socket) + vpe_conf.create_vpe_config(self.scenario_helper.vnf_cfg) + + config_basename = posixpath.basename(self.CFG_CONFIG) + script_basename = posixpath.basename(self.CFG_SCRIPT) + with open(self.CFG_CONFIG) as handle: + vpe_config = handle.read() + + self.ssh_helper.upload_config_file(config_basename, vpe_config.format(**vpe_vars)) + + vpe_script = vpe_conf.generate_vpe_script(self.vnfd_helper.interfaces) + self.ssh_helper.upload_config_file(script_basename, vpe_script.format(**vpe_vars)) + + +class VpeApproxVnf(SampleVNF): """ This class handles vPE VNF model-driver definitions """ - def __init__(self, vnfd): - super(VpeApproxVnf, self).__init__(vnfd) - self.socket = None - self.q_in = Queue() - self.q_out = Queue() - self.vnf_cfg = None - self._vnf_process = None - self.connection = None - self.resource = None - - def _resource_collect_start(self): - self.resource.initiate_systemagent(self.bin_path) - self.resource.start() + APP_NAME = 'vPE_vnf' + APP_WORD = 'vpe' + COLLECT_KPI = VPE_COLLECT_KPI + WAIT_TIME = 20 - def _resource_collect_stop(self): - self.resource.stop() + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + if setup_env_helper_type is None: + setup_env_helper_type = VpeApproxSetupEnvHelper - def _collect_resource_kpi(self): - result = {} + super(VpeApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) - status = self.resource.check_if_sa_running("collectd")[0] - if status: - result = self.resource.amqp_collect_nfvi_kpi() - - result = {"core": result} - - return result - - @classmethod - def __setup_hugepages(cls, connection): - hugepages = \ - connection.execute( - "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1] - hugepages = hugepages.rstrip() - - memory_path = \ - '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages - connection.execute("awk -F: '{ print $1 }' < %s" % memory_path) - - pages = 16384 if hugepages.rstrip() == "2048kB" else 16 - connection.execute("echo %s > %s" % (pages, memory_path)) - - def setup_vnf_environment(self, connection): - ''' setup dpdk environment needed for vnf to run ''' - - self.__setup_hugepages(connection) - connection.execute("modprobe uio && modprobe igb_uio") - - exit_status = connection.execute("lsmod | grep -i igb_uio")[0] - if exit_status == 0: - return - - dpdk = os.path.join(self.bin_path, "dpdk-16.07") - dpdk_setup = \ - provision_tool(self.connection, - os.path.join(self.bin_path, "nsb_setup.sh")) - status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0] - if status: - connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup) - - def _get_cpu_sibling_list(self): - cpu_topo = [] - for core in CORES: - sys_cmd = \ - "/sys/devices/system/cpu/cpu%s/topology/thread_siblings_list" \ - % core - cpuid = \ - self.connection.execute("awk -F: '{ print $1 }' < %s" % - sys_cmd)[1] - cpu_topo += \ - [(idx) if idx.isdigit() else idx for idx in cpuid.split(',')] - - return [cpu.strip() for cpu in cpu_topo] - - def scale(self, flavor=""): - ''' scale vnfbased on flavor input ''' - super(VpeApproxVnf, self).scale(flavor) - - def instantiate(self, scenario_cfg, context_cfg): - vnf_cfg = scenario_cfg['vnf_options']['vpe']['cfg'] - - mgmt_interface = self.vnfd["mgmt-interface"] - self.connection = ssh.SSH.from_node(mgmt_interface) - - self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc']) - - self.setup_vnf_environment(self.connection) - - cores = self._get_cpu_sibling_list() - self.resource = ResourceProfile(self.vnfd, cores) - - self.connection.execute("pkill vPE_vnf") - dpdk_nic_bind = \ - provision_tool(self.connection, - os.path.join(self.bin_path, "dpdk_nic_bind.py")) - - interfaces = self.vnfd["vdu"][0]['external-interface'] - self.socket = \ - next((0 for v in interfaces - if v['virtual-interface']["vpci"][5] == "0"), 1) - - bound_pci = [v['virtual-interface']["vpci"] for v in interfaces] - for vpci in bound_pci: - self.connection.execute( - "%s --force -b igb_uio %s" % (dpdk_nic_bind, vpci)) - queue_wrapper = \ - QueueFileWrapper(self.q_in, self.q_out, "pipeline>") - self._vnf_process = multiprocessing.Process(target=self._run_vpe, - args=(queue_wrapper, - vnf_cfg,)) - self._vnf_process.start() - buf = [] - time.sleep(WAIT_TIME) # Give some time for config to load - while True: - message = '' - while self.q_out.qsize() > 0: - buf.append(self.q_out.get()) - message = ''.join(buf) - if "pipeline>" in message: - LOG.info("VPE VNF is up and running.") - queue_wrapper.clear() - self._resource_collect_start() - return self._vnf_process.exitcode - if "PANIC" in message: - raise RuntimeError("Error starting vPE VNF.") - - LOG.info("Waiting for VNF to start.. ") - time.sleep(3) - if not self._vnf_process.is_alive(): - raise RuntimeError("vPE VNF process died.") - - def _get_ports_gateway(self, name): - if 'routing_table' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['routing_table'] - - for route in routing_table: - if name == route['if']: - return route['gateway'] - - def terminate(self): - self.execute_command("quit") - if self._vnf_process: - self._vnf_process.terminate() - - def _run_vpe(self, filewrapper, vnf_cfg): - mgmt_interface = self.vnfd["mgmt-interface"] - - self.connection = ssh.SSH.from_node(mgmt_interface) - self.connection.wait() - - interfaces = self.vnfd["vdu"][0]['external-interface'] - port0_ip = ipaddress.ip_interface(six.text_type( - "%s/%s" % (interfaces[0]["virtual-interface"]["local_ip"], - interfaces[0]["virtual-interface"]["netmask"]))) - port1_ip = ipaddress.ip_interface(six.text_type( - "%s/%s" % (interfaces[1]["virtual-interface"]["local_ip"], - interfaces[1]["virtual-interface"]["netmask"]))) - dst_port0_ip = ipaddress.ip_interface( - u"%s/%s" % (interfaces[0]["virtual-interface"]["dst_ip"], - interfaces[0]["virtual-interface"]["netmask"])) - dst_port1_ip = ipaddress.ip_interface( - u"%s/%s" % (interfaces[1]["virtual-interface"]["dst_ip"], - interfaces[1]["virtual-interface"]["netmask"])) - - vpe_vars = {"port0_local_ip": port0_ip.ip.exploded, - "port0_dst_ip": dst_port0_ip.ip.exploded, - "port0_local_ip_hex": - self._ip_to_hex(port0_ip.ip.exploded), - "port0_prefixlen": port0_ip.network.prefixlen, - "port0_netmask": port0_ip.network.netmask.exploded, - "port0_netmask_hex": - self._ip_to_hex(port0_ip.network.netmask.exploded), - "port0_local_mac": - interfaces[0]["virtual-interface"]["local_mac"], - "port0_dst_mac": - interfaces[0]["virtual-interface"]["dst_mac"], - "port0_gateway": - self._get_ports_gateway(interfaces[0]["name"]), - "port0_local_network": - port0_ip.network.network_address.exploded, - "port0_prefix": port0_ip.network.prefixlen, - "port1_local_ip": port1_ip.ip.exploded, - "port1_dst_ip": dst_port1_ip.ip.exploded, - "port1_local_ip_hex": - self._ip_to_hex(port1_ip.ip.exploded), - "port1_prefixlen": port1_ip.network.prefixlen, - "port1_netmask": port1_ip.network.netmask.exploded, - "port1_netmask_hex": - self._ip_to_hex(port1_ip.network.netmask.exploded), - "port1_local_mac": - interfaces[1]["virtual-interface"]["local_mac"], - "port1_dst_mac": - interfaces[1]["virtual-interface"]["dst_mac"], - "port1_gateway": - self._get_ports_gateway(interfaces[1]["name"]), - "port1_local_network": - port1_ip.network.network_address.exploded, - "port1_prefix": port1_ip.network.prefixlen, - "port0_local_ip6": self._get_port0localip6(), - "port1_local_ip6": self._get_port1localip6(), - "port0_prefixlen6": self._get_port0prefixlen6(), - "port1_prefixlen6": self._get_port1prefixlen6(), - "port0_gateway6": self._get_port0gateway6(), - "port1_gateway6": self._get_port1gateway6(), - "port0_dst_ip_hex6": self._get_port0localip6(), - "port1_dst_ip_hex6": self._get_port1localip6(), - "port0_dst_netmask_hex6": self._get_port0prefixlen6(), - "port1_dst_netmask_hex6": self._get_port1prefixlen6(), - "bin_path": self.bin_path, - "socket": self.socket} - - for cfg in os.listdir(vnf_cfg): - vpe_config = "" - with open(os.path.join(vnf_cfg, cfg), 'r') as vpe_cfg: - vpe_config = vpe_cfg.read() - - self._provide_config_file(cfg, vpe_config, vpe_vars) - - LOG.info("Provision and start the vPE") - tool_path = provision_tool(self.connection, - os.path.join(self.bin_path, "vPE_vnf")) - cmd = VPE_PIPELINE_COMMAND.format(cfg_file="/tmp/vpe_config", - script="/tmp/vpe_script", - tool_path=tool_path) - self.connection.run(cmd, stdin=filewrapper, stdout=filewrapper, - keep_stdin_open=True, pty=True) - - def _provide_config_file(self, prefix, template, args): - cfg, cfg_content = tempfile.mkstemp() - cfg = os.fdopen(cfg, "w+") - cfg.write(template.format(**args)) - cfg.close() - cfg_file = "/tmp/%s" % prefix - self.connection.put(cfg_content, cfg_file) - return cfg_file - - def execute_command(self, cmd): - ''' send cmd to vnf process ''' - LOG.info("VPE command: %s", cmd) - output = [] - if self.q_in: - self.q_in.put(cmd + "\r\n") - time.sleep(3) - while self.q_out.qsize() > 0: - output.append(self.q_out.get()) - return "".join(output) + def get_stats(self, *args, **kwargs): + raise NotImplementedError def collect_kpi(self): - result = self.get_stats_vpe() - collect_stats = self._collect_resource_kpi() - result["collect_stats"] = collect_stats - LOG.debug("vPE collet Kpis: %s", result) - return result - - def get_stats_vpe(self): - ''' get vpe statistics ''' - result = {'pkt_in_up_stream': 0, 'pkt_drop_up_stream': 0, - 'pkt_in_down_stream': 0, 'pkt_drop_down_stream': 0} - up_stat_commands = ['p 5 stats port in 0', 'p 5 stats port out 0', - 'p 5 stats port out 1'] - down_stat_commands = ['p 9 stats port in 0', 'p 9 stats port out 0'] - pattern = \ - "Pkts in:\\s(\\d+)\\r\\n\\tPkts dropped by " \ - "AH:\\s(\\d+)\\r\\n\\tPkts dropped by other:\\s(\\d+)" - - for cmd in up_stat_commands: - stats = self.execute_command(cmd) - match = re.search(pattern, stats, re.MULTILINE) - if match: - result["pkt_in_up_stream"] = \ - result.get("pkt_in_up_stream", 0) + int(match.group(1)) - result["pkt_drop_up_stream"] = \ - result.get("pkt_drop_up_stream", 0) + \ - int(match.group(2)) + int(match.group(3)) - - for cmd in down_stat_commands: - stats = self.execute_command(cmd) - match = re.search(pattern, stats, re.MULTILINE) - if match: - result["pkt_in_down_stream"] = \ - result.get("pkt_in_down_stream", 0) + int(match.group(1)) - result["pkt_drop_down_stream"] = \ - result.get("pkt_drop_down_stream", 0) + \ - int(match.group(2)) + int(match.group(3)) + result = { + 'pkt_in_up_stream': 0, + 'pkt_drop_up_stream': 0, + 'pkt_in_down_stream': 0, + 'pkt_drop_down_stream': 0, + 'collect_stats': self.resource_helper.collect_kpi(), + } + + indexes_in = [1] + indexes_drop = [2, 3] + command = 'p {0} stats port {1} 0' + for index, direction in ((5, 'up'), (9, 'down')): + key_in = "pkt_in_{0}_stream".format(direction) + key_drop = "pkt_drop_{0}_stream".format(direction) + for mode in ('in', 'out'): + stats = self.vnf_execute(command.format(index, mode)) + match = re.search(self.COLLECT_KPI, stats, re.MULTILINE) + if not match: + continue + result[key_in] += sum(int(match.group(x)) for x in indexes_in) + result[key_drop] += sum(int(match.group(x)) for x in indexes_drop) + + LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result diff --git a/yardstick/network_services/vnf_generic/vnfdgen.py b/yardstick/network_services/vnf_generic/vnfdgen.py index b56a91915..0120b493e 100644 --- a/yardstick/network_services/vnf_generic/vnfdgen.py +++ b/yardstick/network_services/vnf_generic/vnfdgen.py @@ -14,11 +14,16 @@ """ Generic file to map and build vnf discriptor """ from __future__ import absolute_import -import collections +from functools import reduce import jinja2 +import logging import yaml +from yardstick.common.utils import try_int + +LOG = logging.getLogger(__name__) + def render(vnf_model, **kwargs): """Render jinja2 VNF template @@ -40,7 +45,8 @@ def generate_vnfd(vnf_model, node): as input for GenericVNF.__init__ """ # get is unused as global method inside template - node["get"] = get + # node["get"] = key_flatten_get + node["get"] = deepgetitem # Set Node details to default if not defined in pod file # we CANNOT use TaskTemplate.render because it does not allow # for missing variables, we need to allow password for key_filename @@ -52,36 +58,34 @@ def generate_vnfd(vnf_model, node): return filled_vnfd -def dict_key_flatten(data): - """ Convert nested dict structure to dotted key - (e.g. {"a":{"b":1}} -> {"a.b":1} - - :param data: nested dictionary - :return: flat dicrionary - """ - next_data = {} - - # check for non-string iterables - if not any((isinstance(v, collections.Iterable) and not isinstance(v, str)) - for v in data.values()): - return data +# dict_flatten was causing recursion errors with Jinja2 so we removed and replaced +# which this function from stackoverflow that doesn't require generating entire dictionaries +# each time we query a key +def deepgetitem(obj, item, default=None): + """Steps through an item chain to get the ultimate value. - for key, val in data.items(): - if isinstance(val, collections.Mapping): - for n_k, n_v in val.items(): - next_data["%s.%s" % (key, n_k)] = n_v - elif isinstance(val, collections.Iterable) and not isinstance(val, - str): - for index, item in enumerate(val): - next_data["%s%d" % (key, index)] = item - else: - next_data[key] = val + If ultimate value or path to value does not exist, does not raise + an exception and instead returns `fallback`. - return dict_key_flatten(next_data) + Based on + https://stackoverflow.com/a/38623359 + https://stackoverflow.com/users/1820042/donny-winston + add try_int to work with sequences -def get(obj, key, *args): - """ Get template key from dictionary, get default value or raise an exception + >>> d = {'snl_final': {'about': {'_icsd': {'icsd_id': 1, 'fr': [2, 3]}}}} + >>> deepgetitem(d, 'snl_final.about._icsd.icsd_id') + 1 + >>> deepgetitem(d, 'snl_final.about._sandbox.sbx_id') + >>> + >>> deepgetitem(d, 'snl_final.about._icsd.fr.1') + 3 """ - data = dict_key_flatten(obj) - return data.get(key, *args) + def getitem(obj, name): + # if integer then list index + name = try_int(name) + try: + return obj[name] + except (KeyError, TypeError, IndexError): + return default + return reduce(getitem, item.split('.'), obj) diff --git a/yardstick/network_services/yang_model.py b/yardstick/network_services/yang_model.py new file mode 100644 index 000000000..fbf224bd8 --- /dev/null +++ b/yardstick/network_services/yang_model.py @@ -0,0 +1,107 @@ +# Copyright (c) 2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+from __future__ import print_function
+import logging
+import ipaddress
+import yaml
+import six
+
+LOG = logging.getLogger(__name__)
+
+
+class YangModel(object):
+
+ RULE_TEMPLATE = "p acl add 1 {0} {1} {2} {3} {4} {5} {6} {7} 0 0 {8}"
+
+ def __init__(self, config_file):
+ super(YangModel, self).__init__()
+ self._config_file = config_file
+ self._options = {}
+ self._rules = ''
+
+ @property
+ def config_file(self):
+ return self._config_file
+
+ @config_file.setter
+ def config_file(self, value):
+ self._config_file = value
+ self._options = {}
+ self._rules = ''
+
+ def _read_config(self):
+ # TODO: add some error handling in case of empty or non-existing file
+ try:
+ with open(self._config_file) as f:
+ self._options = yaml.safe_load(f)
+ except Exception as e:
+ LOG.exception("Failed to load the yaml %s", e)
+ raise
+
+ def _get_entries(self):
+ if not self._options:
+ return ''
+
+ rule_list = []
+ for ace in self._options['access-list1']['acl']['access-list-entries']:
+ # TODO: resolve ports using topology file and nodes'
+ # ids: public or private.
+ matches = ace['ace']['matches']
+ dst_ipv4_net = matches['destination-ipv4-network']
+ dst_ipv4_net_ip = ipaddress.ip_interface(six.text_type(dst_ipv4_net))
+ port0_local_network = dst_ipv4_net_ip.network.network_address.exploded
+ port0_prefix = dst_ipv4_net_ip.network.prefixlen
+
+ src_ipv4_net = matches['source-ipv4-network']
+ src_ipv4_net_ip = ipaddress.ip_interface(six.text_type(src_ipv4_net))
+ port1_local_network = src_ipv4_net_ip.network.network_address.exploded
+ port1_prefix = src_ipv4_net_ip.network.prefixlen
+
+ lower_dport = matches['destination-port-range']['lower-port']
+ upper_dport = matches['destination-port-range']['upper-port']
+
+ lower_sport = matches['source-port-range']['lower-port']
+ upper_sport = matches['source-port-range']['upper-port']
+
+ # TODO: proto should be read from file also.
+ # Now all rules in sample ACL file are TCP.
+ rule_list.append('') # get an extra new line
+ rule_list.append(self.RULE_TEMPLATE.format(port0_local_network,
+ port0_prefix,
+ port1_local_network,
+ port1_prefix,
+ lower_dport,
+ upper_dport,
+ lower_sport,
+ upper_sport,
+ 0))
+ rule_list.append(self.RULE_TEMPLATE.format(port1_local_network,
+ port1_prefix,
+ port0_local_network,
+ port0_prefix,
+ lower_sport,
+ upper_sport,
+ lower_dport,
+ upper_dport,
+ 1))
+
+ self._rules = '\n'.join(rule_list)
+
+ def get_rules(self):
+ if not self._rules:
+ self._read_config()
+ self._get_entries()
+ return self._rules
|