aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/network_services
diff options
context:
space:
mode:
Diffstat (limited to 'yardstick/network_services')
-rw-r--r--yardstick/network_services/helpers/__init__.py0
-rw-r--r--yardstick/network_services/helpers/cpu.py76
-rw-r--r--yardstick/network_services/helpers/samplevnf_helper.py639
-rw-r--r--yardstick/network_services/libs/__init__.py0
-rw-r--r--yardstick/network_services/libs/ixia_libs/IxNet/IxNet.py335
-rw-r--r--yardstick/network_services/libs/ixia_libs/IxNet/__init__.py0
-rw-r--r--yardstick/network_services/libs/ixia_libs/__init__.py0
-rw-r--r--yardstick/network_services/nfvi/collectd.conf59
-rwxr-xr-xyardstick/network_services/nfvi/collectd.sh73
-rw-r--r--yardstick/network_services/nfvi/resource.py227
-rw-r--r--yardstick/network_services/pipeline.py113
-rw-r--r--yardstick/network_services/traffic_profile/http_ixload.py294
-rw-r--r--yardstick/network_services/traffic_profile/ixia_rfc2544.py155
-rw-r--r--yardstick/network_services/traffic_profile/rfc2544.py210
-rw-r--r--yardstick/network_services/traffic_profile/traffic_profile.py13
-rw-r--r--yardstick/network_services/utils.py8
-rw-r--r--yardstick/network_services/vnf_generic/vnf/acl_vnf.py72
-rw-r--r--yardstick/network_services/vnf_generic/vnf/base.py234
-rw-r--r--yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py123
-rw-r--r--yardstick/network_services/vnf_generic/vnf/sample_vnf.py994
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_ixload.py176
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_ping.py155
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py165
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py303
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_trex.py345
-rw-r--r--yardstick/network_services/vnf_generic/vnf/vfw_vnf.py67
-rw-r--r--yardstick/network_services/vnf_generic/vnf/vpe_vnf.py545
-rw-r--r--yardstick/network_services/vnf_generic/vnfdgen.py64
-rw-r--r--yardstick/network_services/yang_model.py107
29 files changed, 4332 insertions, 1220 deletions
diff --git a/yardstick/network_services/helpers/__init__.py b/yardstick/network_services/helpers/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/yardstick/network_services/helpers/__init__.py
diff --git a/yardstick/network_services/helpers/cpu.py b/yardstick/network_services/helpers/cpu.py
new file mode 100644
index 000000000..a5ba6c31e
--- /dev/null
+++ b/yardstick/network_services/helpers/cpu.py
@@ -0,0 +1,76 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class CpuSysCores(object):
+
+ def __init__(self, connection=""):
+ self.core_map = {}
+ self.connection = connection
+
+ def _open_cpuinfo(self):
+ lines = []
+ lines = self.connection.execute("cat /proc/cpuinfo")[1].split(u'\n')
+ return lines
+
+ def _get_core_details(self, lines):
+ core_details = []
+ core_lines = {}
+ for line in lines:
+ if line.strip():
+ name, value = line.split(":", 1)
+ core_lines[name.strip()] = value.strip()
+ else:
+ core_details.append(core_lines)
+ core_lines = {}
+
+ return core_details
+
+ def get_core_socket(self):
+ lines = self.connection.execute("lscpu")[1].split(u'\n')
+ num_cores = self._get_core_details(lines)
+ for num in num_cores:
+ self.core_map["cores_per_socket"] = num["Core(s) per socket"]
+ self.core_map["thread_per_core"] = num["Thread(s) per core"]
+
+ lines = self._open_cpuinfo()
+ core_details = self._get_core_details(lines)
+ for core in core_details:
+ for k, v in core.items():
+ if k == "physical id":
+ if core["physical id"] not in self.core_map:
+ self.core_map[core['physical id']] = []
+ self.core_map[core['physical id']].append(
+ core["processor"])
+
+ return self.core_map
+
+ def validate_cpu_cfg(self, vnf_cfg=None):
+ if vnf_cfg is None:
+ vnf_cfg = {
+ 'lb_config': 'SW',
+ 'lb_count': 1,
+ 'worker_config': '1C/1T',
+ 'worker_threads': 1
+ }
+ if self.core_map["thread_per_core"] == 1 and \
+ vnf_cfg["worker_config"] == "1C/2T":
+ return -1
+
+ if vnf_cfg['lb_config'] == 'SW':
+ num_cpu = int(vnf_cfg["worker_threads"]) + 5
+ if int(self.core_map["cores_per_socket"]) < num_cpu:
+ return -1
+
+ return 0
diff --git a/yardstick/network_services/helpers/samplevnf_helper.py b/yardstick/network_services/helpers/samplevnf_helper.py
new file mode 100644
index 000000000..1eefc5ffa
--- /dev/null
+++ b/yardstick/network_services/helpers/samplevnf_helper.py
@@ -0,0 +1,639 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+import ipaddress
+import logging
+import os
+import sys
+from collections import OrderedDict, defaultdict
+from itertools import chain
+
+import six
+from six.moves.configparser import ConfigParser
+
+from yardstick.common.utils import ip_to_hex
+
+LOG = logging.getLogger(__name__)
+
+LINK_CONFIG_TEMPLATE = """\
+link {0} down
+link {0} config {1} {2}
+link {0} up
+"""
+
+ACTION_TEMPLATE = """\
+p action add {0} accept
+p action add {0} fwd
+p action add {0} count
+"""
+
+FW_ACTION_TEMPLATE = """\
+p action add {0} accept
+p action add {0} fwd
+p action add {0} count
+p action add {0} conntrack
+"""
+
+# This sets up a basic passthrough with no rules
+SCRIPT_TPL = """
+{link_config}
+
+{arp_config}
+
+{arp_config6}
+
+{actions}
+
+{rules}
+
+"""
+
+
+class MultiPortConfig(object):
+
+ HW_LB = "HW"
+
+ @staticmethod
+ def float_x_plus_one_tenth_of_y(x, y):
+ return float(x) + float(y) / 10.0
+
+ @staticmethod
+ def make_str(base, iterator):
+ return ' '.join((base.format(x) for x in iterator))
+
+ @classmethod
+ def make_range_str(cls, base, start, stop=0, offset=0):
+ if offset and not stop:
+ stop = start + offset
+ return cls.make_str(base, range(start, stop))
+
+ @staticmethod
+ def parser_get(parser, section, key, default=None):
+ if parser.has_option(section, key):
+ return parser.get(section, key)
+ return default
+
+ @staticmethod
+ def make_ip_addr(ip, mask_len):
+ try:
+ return ipaddress.ip_interface(six.text_type('/'.join([ip, mask_len])))
+ except ValueError:
+ # None so we can skip later
+ return None
+
+ @classmethod
+ def validate_ip_and_prefixlen(cls, ip_addr, prefixlen):
+ ip_addr = cls.make_ip_addr(ip_addr, prefixlen)
+ return ip_addr.ip.exploded, ip_addr.network.prefixlen
+
+ def __init__(self, topology_file, config_tpl, tmp_file, interfaces=None,
+ vnf_type='CGNAT', lb_count=2, worker_threads=3,
+ worker_config='1C/1T', lb_config='SW', socket=0):
+
+ super(MultiPortConfig, self).__init__()
+ self.topology_file = topology_file
+ self.worker_config = worker_config.split('/')[1].lower()
+ self.worker_threads = self.get_worker_threads(worker_threads)
+ self.vnf_type = vnf_type
+ self.pipe_line = 0
+ self.interfaces = interfaces if interfaces else {}
+ self.networks = {}
+ self.write_parser = ConfigParser()
+ self.read_parser = ConfigParser()
+ self.read_parser.read(config_tpl)
+ self.master_core = self.read_parser.get("PIPELINE0", "core")
+ self.master_tpl = self.get_config_tpl_data('MASTER')
+ self.arpicmp_tpl = self.get_config_tpl_data('ARPICMP')
+ self.txrx_tpl = self.get_config_tpl_data('TXRX')
+ self.loadb_tpl = self.get_config_tpl_data('LOADB')
+ self.vnf_tpl = self.get_config_tpl_data(vnf_type)
+ self.swq = 0
+ self.lb_count = int(lb_count)
+ self.lb_config = lb_config
+ self.tmp_file = os.path.join("/tmp", tmp_file)
+ self.pktq_out_os = []
+ self.socket = socket
+ self.start_core = ""
+ self.pipeline_counter = ""
+ self.txrx_pipeline = ""
+ self.port_pair_list = []
+ self.lb_to_port_pair_mapping = {}
+ self.init_eal()
+
+ self.lb_index = None
+ self.mul = 0
+ self.port_pairs = []
+ self.port_pair_list = []
+ self.ports_len = 0
+ self.prv_que_handler = None
+ self.vnfd = None
+ self.rules = None
+ self.pktq_out = ''
+
+ @staticmethod
+ def gen_core(core):
+ # return "s{}c{}".format(self.socket, core)
+ # don't use sockets for VNFs, because we don't want to have to
+ # adjust VM CPU topology. It is virtual anyway
+ return str(core)
+
+ def make_port_pairs_iter(self, operand, iterable):
+ return (operand(x[-1], y) for y in iterable for x in chain(*self.port_pairs))
+
+ def make_range_port_pairs_iter(self, operand, start, end):
+ return self.make_port_pairs_iter(operand, range(start, end))
+
+ def init_eal(self):
+ vpci = [v['virtual-interface']["vpci"] for v in self.interfaces]
+ with open(self.tmp_file, 'w') as fh:
+ fh.write('[EAL]\n')
+ for item in vpci:
+ fh.write('w = {0}\n'.format(item))
+ fh.write('\n')
+
+ def update_timer(self):
+ timer_tpl = self.get_config_tpl_data('TIMER')
+ timer_tpl['core'] = self.gen_core(self.start_core)
+ self.update_write_parser(timer_tpl)
+ self.start_core += 1
+
+ def get_config_tpl_data(self, type_value):
+ for section in self.read_parser.sections():
+ if self.read_parser.has_option(section, 'type'):
+ if type_value == self.read_parser.get(section, 'type'):
+ tpl = OrderedDict(self.read_parser.items(section))
+ return tpl
+
+ def get_txrx_tpl_data(self, value):
+ for section in self.read_parser.sections():
+ if self.read_parser.has_option(section, 'pipeline_txrx_type'):
+ if value == self.read_parser.get(section, 'pipeline_txrx_type'):
+ tpl = OrderedDict(self.read_parser.items(section))
+ return tpl
+
+ def init_write_parser_template(self, type_value='ARPICMP'):
+ for section in self.read_parser.sections():
+ if type_value == self.parser_get(self.read_parser, section, 'type', object()):
+ self.start_core = self.read_parser.getint(section, 'core')
+ self.pipeline_counter = self.read_parser.getint(section, 'core')
+ self.txrx_pipeline = self.read_parser.getint(section, 'core')
+ return
+ self.write_parser.add_section(section)
+ for name, value in self.read_parser.items(section):
+ self.write_parser.set(section, name, value)
+
+ def update_write_parser(self, data):
+ section = "PIPELINE{0}".format(self.pipeline_counter)
+ self.write_parser.add_section(section)
+ for name, value in data.items():
+ self.write_parser.set(section, name, value)
+
+ def get_worker_threads(self, worker_threads):
+ if self.worker_config == '1t':
+ return worker_threads
+ else:
+ return worker_threads - worker_threads % 2
+
+ def generate_next_core_id(self):
+ if self.worker_config == '1t':
+ self.start_core += 1
+ return
+
+ try:
+ self.start_core = 'h{}'.format(int(self.start_core))
+ except ValueError:
+ self.start_core = int(self.start_core[:-1]) + 1
+
+ @staticmethod
+ def get_port_pairs(interfaces):
+ port_pair_list = []
+ networks = defaultdict(list)
+ for private_intf in interfaces:
+ vintf = private_intf['virtual-interface']
+ networks[vintf['vld_id']].append(vintf)
+
+ for name, net in networks.items():
+ # partition returns a tuple
+ parts = list(name.partition('private'))
+ if parts[0]:
+ # 'private' was not in or not leftmost in the string
+ continue
+ parts[1] = 'public'
+ public_id = ''.join(parts)
+ for private_intf in net:
+ try:
+ public_peer_intfs = networks[public_id]
+ except KeyError:
+ LOG.warning("private network without peer %s, %s not found", name, public_id)
+ continue
+
+ for public_intf in public_peer_intfs:
+ port_pair = private_intf["ifname"], public_intf["ifname"]
+ port_pair_list.append(port_pair)
+
+ return port_pair_list, networks
+
+ def get_lb_count(self):
+ self.lb_count = int(min(len(self.port_pair_list), self.lb_count))
+
+ def generate_lb_to_port_pair_mapping(self):
+ self.lb_to_port_pair_mapping = defaultdict(int)
+ port_pair_count = len(self.port_pair_list)
+ lb_pair_count = int(port_pair_count / self.lb_count)
+ for i in range(self.lb_count):
+ self.lb_to_port_pair_mapping[i + 1] = lb_pair_count
+ for i in range(port_pair_count % self.lb_count):
+ self.lb_to_port_pair_mapping[i + 1] += 1
+
+ def set_priv_to_pub_mapping(self):
+ return "".join(str(y) for y in [(int(x[0][-1]), int(x[1][-1])) for x in
+ self.port_pair_list])
+
+ def set_priv_que_handler(self):
+ # iterated twice, can't be generator
+ priv_to_pub_map = [(int(x[0][-1]), int(x[1][-1])) for x in self.port_pairs]
+ # must be list to use .index()
+ port_list = list(chain.from_iterable(priv_to_pub_map))
+ priv_ports = (x[0] for x in priv_to_pub_map)
+ self.prv_que_handler = '({})'.format(
+ ",".join((str(port_list.index(x)) for x in priv_ports)))
+
+ def generate_arp_route_tbl(self):
+ arp_config = []
+ arp_route_tbl_tmpl = "({port0_dst_ip_hex},{port0_netmask_hex},{port_num}," \
+ "{next_hop_ip_hex})"
+ for port_pair in self.port_pair_list:
+ for port in port_pair:
+ port_num = int(port[-1])
+ interface = self.interfaces[port_num]
+ # port0_ip = ipaddress.ip_interface(six.text_type(
+ # "%s/%s" % (interface["virtual-interface"]["local_ip"],
+ # interface["virtual-interface"]["netmask"])))
+ dst_port0_ip = \
+ ipaddress.ip_interface(six.text_type(
+ "%s/%s" % (interface["virtual-interface"]["dst_ip"],
+ interface["virtual-interface"]["netmask"])))
+ arp_vars = {
+ "port0_dst_ip_hex": ip_to_hex(dst_port0_ip.ip.exploded),
+ "port0_netmask_hex": ip_to_hex(dst_port0_ip.network.netmask.exploded),
+ "port_num": port_num,
+ # next hop is dst in this case
+ "next_hop_ip_hex": ip_to_hex(dst_port0_ip.ip.exploded),
+ }
+ arp_config.append(arp_route_tbl_tmpl.format(**arp_vars))
+
+ return ' '.join(arp_config)
+
+ def generate_arpicmp_data(self):
+ swq_in_str = self.make_range_str('SWQ{}', self.swq, offset=self.lb_count)
+ self.swq += self.lb_count
+ swq_out_str = self.make_range_str('SWQ{}', self.swq, offset=self.lb_count)
+ self.swq += self.lb_count
+ mac_iter = (self.interfaces[int(x[-1])]['virtual-interface']['local_mac']
+ for port_pair in self.port_pair_list for x in port_pair)
+ pktq_in_iter = ('RXQ{}'.format(float(x[0][-1])) for x in self.port_pair_list)
+
+ arpicmp_data = {
+ 'core': self.gen_core(self.start_core),
+ 'pktq_in': swq_in_str,
+ 'pktq_out': swq_out_str,
+ 'ports_mac_list': ' '.join(mac_iter),
+ 'pktq_in_prv': ' '.join(pktq_in_iter),
+ 'prv_to_pub_map': self.set_priv_to_pub_mapping(),
+ 'arp_route_tbl': self.generate_arp_route_tbl(),
+ # can't use empty string, defaul to ()
+ 'nd_route_tbl': "()",
+ }
+ self.pktq_out_os = swq_out_str.split(' ')
+ # why?
+ if self.lb_config == self.HW_LB:
+ arpicmp_data['pktq_in'] = swq_in_str
+ self.swq = 0
+ return arpicmp_data
+
+ def generate_final_txrx_data(self):
+ swq_start = self.swq - self.ports_len * self.worker_threads
+
+ txq_start = 0
+ txq_end = self.worker_threads
+
+ pktq_out_iter = self.make_range_port_pairs_iter(self.float_x_plus_one_tenth_of_y,
+ txq_start, txq_end)
+
+ swq_str = self.make_range_str('SWQ{}', swq_start, self.swq)
+ txq_str = self.make_str('TXQ{}', pktq_out_iter)
+ rxtx_data = {
+ 'pktq_in': swq_str,
+ 'pktq_out': txq_str,
+ 'pipeline_txrx_type': 'TXTX',
+ 'core': self.gen_core(self.start_core),
+ }
+ pktq_in = rxtx_data['pktq_in']
+ pktq_in = '{0} {1}'.format(pktq_in, self.pktq_out_os[self.lb_index - 1])
+ rxtx_data['pktq_in'] = pktq_in
+ self.pipeline_counter += 1
+ return rxtx_data
+
+ def generate_initial_txrx_data(self):
+ pktq_iter = self.make_range_port_pairs_iter(self.float_x_plus_one_tenth_of_y,
+ 0, self.worker_threads)
+
+ rxq_str = self.make_str('RXQ{}', pktq_iter)
+ swq_str = self.make_range_str('SWQ{}', self.swq, offset=self.ports_len)
+ txrx_data = {
+ 'pktq_in': rxq_str,
+ 'pktq_out': swq_str + ' SWQ{0}'.format(self.lb_index - 1),
+ 'pipeline_txrx_type': 'RXRX',
+ 'core': self.gen_core(self.start_core),
+ }
+ self.pipeline_counter += 1
+ return txrx_data
+
+ def generate_lb_data(self):
+ pktq_in = self.make_range_str('SWQ{}', self.swq, offset=self.ports_len)
+ self.swq += self.ports_len
+
+ offset = self.ports_len * self.worker_threads
+ pktq_out = self.make_range_str('SWQ{}', self.swq, offset=offset)
+ self.pktq_out = pktq_out.split()
+
+ self.swq += (self.ports_len * self.worker_threads)
+ lb_data = {
+ 'prv_que_handler': self.prv_que_handler,
+ 'pktq_in': pktq_in,
+ 'pktq_out': pktq_out,
+ 'n_vnf_threads': str(self.worker_threads),
+ 'core': self.gen_core(self.start_core),
+ }
+ self.pipeline_counter += 1
+ return lb_data
+
+ def generate_vnf_data(self):
+ if self.lb_config == self.HW_LB:
+ port_iter = self.make_port_pairs_iter(self.float_x_plus_one_tenth_of_y, [self.mul])
+ pktq_in = self.make_str('RXQ{}', port_iter)
+
+ self.mul += 1
+ port_iter = self.make_port_pairs_iter(self.float_x_plus_one_tenth_of_y, [self.mul])
+ pktq_out = self.make_str('TXQ{}', port_iter)
+
+ pipe_line_data = {
+ 'pktq_in': pktq_in,
+ 'pktq_out': pktq_out + ' SWQ{0}'.format(self.swq),
+ 'prv_que_handler': self.prv_que_handler,
+ 'core': self.gen_core(self.start_core),
+ }
+ self.swq += 1
+ else:
+ pipe_line_data = {
+ 'pktq_in': ' '.join((self.pktq_out.pop(0) for _ in range(self.ports_len))),
+ 'pktq_out': self.make_range_str('SWQ{}', self.swq, offset=self.ports_len),
+ 'prv_que_handler': self.prv_que_handler,
+ 'core': self.gen_core(self.start_core),
+ }
+ self.swq += self.ports_len
+
+ if self.vnf_type in ('ACL', 'VFW'):
+ pipe_line_data.pop('prv_que_handler')
+
+ if self.vnf_tpl.get('vnf_set'):
+ public_ip_port_range_list = self.vnf_tpl['public_ip_port_range'].split(':')
+ ip_in_hex = '{:x}'.format(int(public_ip_port_range_list[0], 16) + self.lb_index - 1)
+ public_ip_port_range_list[0] = ip_in_hex
+ self.vnf_tpl['public_ip_port_range'] = ':'.join(public_ip_port_range_list)
+
+ self.pipeline_counter += 1
+ return pipe_line_data
+
+ def generate_config_data(self):
+ self.init_write_parser_template()
+
+ # use master core for master, don't use self.start_core
+ self.write_parser.set('PIPELINE0', 'core', self.gen_core(self.master_core))
+ arpicmp_data = self.generate_arpicmp_data()
+ self.arpicmp_tpl.update(arpicmp_data)
+ self.update_write_parser(self.arpicmp_tpl)
+
+ self.start_core += 1
+ if self.vnf_type == 'CGNAPT':
+ self.pipeline_counter += 1
+ self.update_timer()
+
+ for lb in self.lb_to_port_pair_mapping:
+ self.lb_index = lb
+ self.mul = 0
+ port_pair_count = self.lb_to_port_pair_mapping[lb]
+ if not self.port_pair_list:
+ continue
+
+ self.port_pairs = self.port_pair_list[:port_pair_count]
+ self.port_pair_list = self.port_pair_list[port_pair_count:]
+ self.ports_len = port_pair_count * 2
+ self.set_priv_que_handler()
+ if self.lb_config == 'SW':
+ txrx_data = self.generate_initial_txrx_data()
+ self.txrx_tpl.update(txrx_data)
+ self.update_write_parser(self.txrx_tpl)
+ self.start_core += 1
+ lb_data = self.generate_lb_data()
+ self.loadb_tpl.update(lb_data)
+ self.update_write_parser(self.loadb_tpl)
+ self.start_core += 1
+
+ for i in range(self.worker_threads):
+ vnf_data = self.generate_vnf_data()
+ if not self.vnf_tpl:
+ self.vnf_tpl = {}
+ self.vnf_tpl.update(vnf_data)
+ self.update_write_parser(self.vnf_tpl)
+ try:
+ self.vnf_tpl.pop('vnf_set')
+ except KeyError:
+ pass
+ else:
+ self.vnf_tpl.pop('public_ip_port_range')
+ self.generate_next_core_id()
+
+ if self.lb_config == 'SW':
+ txrx_data = self.generate_final_txrx_data()
+ self.txrx_tpl.update(txrx_data)
+ self.update_write_parser(self.txrx_tpl)
+ self.start_core += 1
+ self.vnf_tpl = self.get_config_tpl_data(self.vnf_type)
+
+ def generate_config(self):
+ self.port_pair_list, self.networks = self.get_port_pairs(self.interfaces)
+ self.get_lb_count()
+ self.generate_lb_to_port_pair_mapping()
+ self.generate_config_data()
+ self.write_parser.write(sys.stdout)
+ with open(self.tmp_file, 'a') as tfh:
+ self.write_parser.write(tfh)
+
+ def generate_link_config(self):
+
+ link_configs = []
+ for port_pair in self.port_pair_list:
+ for port in port_pair:
+ port = port[-1]
+ virtual_interface = self.interfaces[int(port)]["virtual-interface"]
+ local_ip = virtual_interface["local_ip"]
+ netmask = virtual_interface["netmask"]
+ port_ip, prefix_len = self.validate_ip_and_prefixlen(local_ip, netmask)
+ link_configs.append(LINK_CONFIG_TEMPLATE.format(port, port_ip, prefix_len))
+
+ return ''.join(link_configs)
+
+ def get_route_data(self, src_key, data_key, port):
+ route_list = self.vnfd['vdu'][0].get(src_key, [])
+ return next((route[data_key] for route in route_list if route['if'] == port), None)
+
+ def get_ports_gateway(self, port):
+ return self.get_route_data('routing_table', 'gateway', port)
+
+ def get_ports_gateway6(self, port):
+ return self.get_route_data('nd_route_tbl', 'gateway', port)
+
+ def get_netmask_gateway(self, port):
+ return self.get_route_data('routing_table', 'netmask', port)
+
+ def get_netmask_gateway6(self, port):
+ return self.get_route_data('nd_route_tbl', 'netmask', port)
+
+ def generate_arp_config(self):
+ arp_config = []
+ for port_pair in self.port_pair_list:
+ for port in port_pair:
+ gateway = self.get_ports_gateway(port)
+ # omit entries with no gateway
+ if not gateway:
+ continue
+ dst_mac = self.interfaces[int(port[-1])]["virtual-interface"]["dst_mac"]
+ arp_config.append((port[-1], gateway, dst_mac, self.txrx_pipeline))
+
+ return '\n'.join(('p {3} arpadd {0} {1} {2}'.format(*values) for values in arp_config))
+
+ def generate_arp_config6(self):
+ arp_config6 = []
+ for port_pair in self.port_pair_list:
+ for port in port_pair:
+ gateway6 = self.get_ports_gateway6(port)
+ # omit entries with no gateway
+ if not gateway6:
+ continue
+ dst_mac6 = self.interfaces[int(port[-1])]["virtual-interface"]["dst_mac"]
+ arp_config6.append((port[-1], gateway6, dst_mac6, self.txrx_pipeline))
+
+ return '\n'.join(('p {3} arpadd {0} {1} {2}'.format(*values) for values in arp_config6))
+
+ def generate_action_config(self):
+ port_list = []
+ for port_pair in self.port_pair_list:
+ for port in port_pair:
+ port_list.append(port[-1])
+
+ if self.vnf_type == "VFW":
+ template = FW_ACTION_TEMPLATE
+ else:
+ template = ACTION_TEMPLATE
+
+ return ''.join((template.format(port) for port in port_list))
+
+ def get_ip_from_port(self, port):
+ return self.make_ip_addr(self.get_ports_gateway(port), self.get_netmask_gateway(port))
+
+ def get_ip_and_prefixlen_from_ip_of_port(self, port):
+ ip_addr = self.get_ip_from_port(port)
+ # handle cases with no gateway
+ if ip_addr:
+ return ip_addr.ip.exploded, ip_addr.network.prefixlen
+ else:
+ return None, None
+
+ def generate_rule_config(self):
+ cmd = 'acl' if self.vnf_type == "ACL" else "vfw"
+ rules_config = self.rules if self.rules else ''
+ new_rules = []
+ new_ipv6_rules = []
+ pattern = 'p {0} add {1} {2} {3} {4} {5} 0 65535 0 65535 0 0 {6}'
+ for port_pair in self.port_pair_list:
+ src_port = int(port_pair[0][-1])
+ dst_port = int(port_pair[1][-1])
+
+ src_ip, src_prefix_len = self.get_ip_and_prefixlen_from_ip_of_port(port_pair[0])
+ dst_ip, dst_prefix_len = self.get_ip_and_prefixlen_from_ip_of_port(port_pair[1])
+ # ignore entires with empty values
+ if all((src_ip, src_prefix_len, dst_ip, dst_prefix_len)):
+ new_rules.append((cmd, self.txrx_pipeline, src_ip, src_prefix_len,
+ dst_ip, dst_prefix_len, dst_port))
+ new_rules.append((cmd, self.txrx_pipeline, dst_ip, dst_prefix_len,
+ src_ip, src_prefix_len, src_port))
+
+ src_ip = self.get_ports_gateway6(port_pair[0])
+ src_prefix_len = self.get_netmask_gateway6(port_pair[0])
+ dst_ip = self.get_ports_gateway6(port_pair[1])
+ dst_prefix_len = self.get_netmask_gateway6(port_pair[0])
+ # ignore entires with empty values
+ if all((src_ip, src_prefix_len, dst_ip, dst_prefix_len)):
+ new_ipv6_rules.append((cmd, self.txrx_pipeline, src_ip, src_prefix_len,
+ dst_ip, dst_prefix_len, dst_port))
+ new_ipv6_rules.append((cmd, self.txrx_pipeline, dst_ip, dst_prefix_len,
+ src_ip, src_prefix_len, src_port))
+
+ acl_apply = "\np %s applyruleset" % cmd
+ new_rules_config = '\n'.join(pattern.format(*values) for values
+ in chain(new_rules, new_ipv6_rules))
+ return ''.join([rules_config, new_rules_config, acl_apply])
+
+ def generate_script_data(self):
+ self.port_pair_list, self.networks = self.get_port_pairs(self.interfaces)
+ self.get_lb_count()
+ script_data = {
+ 'link_config': self.generate_link_config(),
+ 'arp_config': self.generate_arp_config(),
+ 'arp_config6': self.generate_arp_config6(),
+ 'actions': '',
+ 'rules': '',
+ }
+
+ if self.vnf_type in ('ACL', 'VFW'):
+ script_data.update({
+ 'actions': self.generate_action_config(),
+ 'rules': self.generate_rule_config(),
+ })
+
+ return script_data
+
+ def generate_script(self, vnfd, rules=None):
+ self.vnfd = vnfd
+ self.rules = rules
+ script_data = self.generate_script_data()
+ script = SCRIPT_TPL.format(**script_data)
+ if self.lb_config == self.HW_LB:
+ script += 'set fwd rxonly'
+ hwlb_tpl = """
+set_sym_hash_ena_per_port {0} enable
+set_hash_global_config {0} simple_xor ipv4-udp enable
+set_sym_hash_ena_per_port {1} enable
+set_hash_global_config {1} simple_xor ipv4-udp enable
+set_hash_input_set {0} ipv4-udp src-ipv4 udp-src-port add
+set_hash_input_set {1} ipv4-udp dst-ipv4 udp-dst-port add
+set_hash_input_set {0} ipv6-udp src-ipv6 udp-src-port add
+set_hash_input_set {1} ipv6-udp dst-ipv6 udp-dst-port add
+"""
+ for port_pair in self.port_pair_list:
+ script += hwlb_tpl.format(port_pair[0][-1], port_pair[1][-1])
+ return script
diff --git a/yardstick/network_services/libs/__init__.py b/yardstick/network_services/libs/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/yardstick/network_services/libs/__init__.py
diff --git a/yardstick/network_services/libs/ixia_libs/IxNet/IxNet.py b/yardstick/network_services/libs/ixia_libs/IxNet/IxNet.py
new file mode 100644
index 000000000..815a2a21c
--- /dev/null
+++ b/yardstick/network_services/libs/ixia_libs/IxNet/IxNet.py
@@ -0,0 +1,335 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+from __future__ import print_function
+import sys
+import logging
+
+import re
+from itertools import product
+
+from yardstick.common.utils import ErrorClass
+
+try:
+ import IxNetwork
+except ImportError:
+ IxNetwork = ErrorClass
+
+log = logging.getLogger(__name__)
+
+IP_VERSION_4 = 4
+IP_VERSION_6 = 6
+
+
+class TrafficStreamHelper(object):
+
+ TEMPLATE = '{0.traffic_item}/{0.stream}:{0.param_id}/{1}'
+
+ def __init__(self, traffic_item, stream, param_id):
+ super(TrafficStreamHelper, self).__init__()
+ self.traffic_item = traffic_item
+ self.stream = stream
+ self.param_id = param_id
+
+ def __getattr__(self, item):
+ return self.TEMPLATE.format(self, item)
+
+
+class FramesizeHelper(object):
+
+ def __init__(self):
+ super(FramesizeHelper, self).__init__()
+ self.weighted_pairs = []
+ self.weighted_range_pairs = []
+
+ @property
+ def weighted_pairs_arg(self):
+ return '-weightedPairs', self.weighted_pairs
+
+ @property
+ def weighted_range_pairs_arg(self):
+ return '-weightedRangePairs', self.weighted_range_pairs
+
+ def make_args(self, *args):
+ return self.weighted_pairs_arg + self.weighted_range_pairs_arg + args
+
+ def populate_data(self, framesize_data):
+ for key, value in framesize_data.items():
+ if value == '0':
+ continue
+
+ replaced = re.sub('[Bb]', '', key)
+ self.weighted_pairs.extend([
+ replaced,
+ value,
+ ])
+ pairs = [
+ replaced,
+ replaced,
+ value,
+ ]
+ self.weighted_range_pairs.append(pairs)
+
+
+class IxNextgen(object):
+
+ STATS_NAME_MAP = {
+ "traffic_item": 'Traffic Item',
+ "Tx_Frames": 'Tx Frames',
+ "Rx_Frames": 'Rx Frames',
+ "Tx_Frame_Rate": 'Tx Frame Rate',
+ "Rx_Frame_Rate": 'Tx Frame Rate',
+ "Store-Forward_Avg_latency_ns": 'Store-Forward Avg Latency (ns)',
+ "Store-Forward_Min_latency_ns": 'Store-Forward Min Latency (ns)',
+ "Store-Forward_Max_latency_ns": 'Store-Forward Max Latency (ns)',
+ }
+
+ PORT_STATS_NAME_MAP = {
+ "stat_name": 'Stat Name',
+ "Frames_Tx": 'Frames Tx.',
+ "Valid_Frames_Rx": 'Valid Frames Rx.',
+ "Frames_Tx_Rate": 'Frames Tx. Rate',
+ "Valid_Frames_Rx_Rate": 'Valid Frames Rx. Rate',
+ "Tx_Rate_Kbps": 'Tx. Rate (Kbps)',
+ "Rx_Rate_Kbps": 'Rx. Rate (Kbps)',
+ "Tx_Rate_Mbps": 'Tx. Rate (Mbps)',
+ "Rx_Rate_Mbps": 'Rx. Rate (Mbps)',
+ }
+
+ LATENCY_NAME_MAP = {
+ "Store-Forward_Avg_latency_ns": 'Store-Forward Avg Latency (ns)',
+ "Store-Forward_Min_latency_ns": 'Store-Forward Min Latency (ns)',
+ "Store-Forward_Max_latency_ns": 'Store-Forward Max Latency (ns)',
+ }
+
+ RANDOM_MASK_MAP = {
+ IP_VERSION_4: '0.0.0.255',
+ IP_VERSION_6: '0:0:0:0:0:0:0:ff',
+ }
+
+ MODE_SEEDS_MAP = {
+ 0: ('private', ['256', '2048']),
+ }
+
+ MODE_SEEDS_DEFAULT = 'public', ['2048', '256']
+
+ @staticmethod
+ def find_view_obj(view_name, views):
+ edited_view_name = '::ixNet::OBJ-/statistics/view:"{}"'.format(view_name)
+ return next((view for view in views if edited_view_name == view), '')
+
+ @staticmethod
+ def get_config(tg_cfg):
+ external_interface = tg_cfg["vdu"][0]["external-interface"]
+ card_port0 = external_interface[0]["virtual-interface"]["vpci"]
+ card_port1 = external_interface[1]["virtual-interface"]["vpci"]
+ card0, port0 = card_port0.split(':')[:2]
+ card1, port1 = card_port1.split(':')[:2]
+ cfg = {
+ 'py_lib_path': tg_cfg["mgmt-interface"]["tg-config"]["py_lib_path"],
+ 'machine': tg_cfg["mgmt-interface"]["ip"],
+ 'port': tg_cfg["mgmt-interface"]["tg-config"]["tcl_port"],
+ 'chassis': tg_cfg["mgmt-interface"]["tg-config"]["ixchassis"],
+ 'card1': card0,
+ 'port1': port0,
+ 'card2': card1,
+ 'port2': port1,
+ 'output_dir': tg_cfg["mgmt-interface"]["tg-config"]["dut_result_dir"],
+ 'version': tg_cfg["mgmt-interface"]["tg-config"]["version"],
+ 'bidir': True,
+ }
+ return cfg
+
+ def __init__(self, ixnet=None):
+ self.ixnet = ixnet
+ self._objRefs = dict()
+ self._cfg = None
+ self._logger = logging.getLogger(__name__)
+ self._params = None
+ self._bidir = None
+
+ def iter_over_get_lists(self, x1, x2, y2, offset=0):
+ for x in self.ixnet.getList(x1, x2):
+ y_list = self.ixnet.getList(x, y2)
+ for i, y in enumerate(y_list, offset):
+ yield x, y, i
+
+ def set_random_ip_multi_attribute(self, ipv4, seed, fixed_bits, random_mask, l3_count):
+ self.ixnet.setMultiAttribute(
+ ipv4,
+ '-seed', str(seed),
+ '-fixedBits', str(fixed_bits),
+ '-randomMask', str(random_mask),
+ '-valueType', 'random',
+ '-countValue', str(l3_count))
+
+ def set_random_ip_multi_attributes(self, ip, version, seeds, l3):
+ try:
+ random_mask = self.RANDOM_MASK_MAP[version]
+ except KeyError:
+ raise ValueError('Unknown version %s' % version)
+
+ l3_count = l3['count']
+ if "srcIp" in ip:
+ fixed_bits = l3['srcip4']
+ self.set_random_ip_multi_attribute(ip, seeds[0], fixed_bits, random_mask, l3_count)
+ if "dstIp" in ip:
+ fixed_bits = l3['dstip4']
+ self.set_random_ip_multi_attribute(ip, seeds[1], fixed_bits, random_mask, l3_count)
+
+ def add_ip_header(self, params, version):
+ for it, ep, i in self.iter_over_get_lists('/traffic', 'trafficItem', "configElement"):
+ mode, seeds = self.MODE_SEEDS_MAP.get(i, self.MODE_SEEDS_DEFAULT)
+ l3 = params[mode]['outer_l3']
+
+ for ip, ip_bits, _ in self.iter_over_get_lists(ep, 'stack', 'field'):
+ self.set_random_ip_multi_attributes(ip_bits, version, seeds, l3)
+
+ self.ixnet.commit()
+
+ def _connect(self, tg_cfg):
+ self._cfg = self.get_config(tg_cfg)
+
+ sys.path.append(self._cfg["py_lib_path"])
+ self.ixnet = IxNetwork.IxNet()
+
+ machine = self._cfg['machine']
+ port = str(self._cfg['port'])
+ version = str(self._cfg['version'])
+ result = self.ixnet.connect(machine, '-port', port, '-version', version)
+ return result
+
+ def clear_ixia_config(self):
+ self.ixnet.execute('newConfig')
+
+ def load_ixia_profile(self, profile):
+ self.ixnet.execute('loadConfig', self.ixnet.readFrom(profile))
+
+ def ix_load_config(self, profile):
+ self.clear_ixia_config()
+ self.load_ixia_profile(profile)
+
+ def ix_assign_ports(self):
+ vports = self.ixnet.getList(self.ixnet.getRoot(), 'vport')
+ ports = [
+ (self._cfg['chassis'], self._cfg['card1'], self._cfg['port1']),
+ (self._cfg['chassis'], self._cfg['card2'], self._cfg['port2']),
+ ]
+
+ vport_list = self.ixnet.getList("/", "vport")
+ self.ixnet.execute('assignPorts', ports, [], vport_list, True)
+ self.ixnet.commit()
+
+ for vport in vports:
+ if self.ixnet.getAttribute(vport, '-state') != 'up':
+ log.error("Both thr ports are down...")
+
+ def ix_update_frame(self, params):
+ streams = ["configElement"]
+
+ for param in params.values():
+ framesize_data = FramesizeHelper()
+ traffic_items = self.ixnet.getList('/traffic', 'trafficItem')
+ param_id = param['id']
+ for traffic_item, stream in product(traffic_items, streams):
+ helper = TrafficStreamHelper(traffic_item, stream, param_id)
+
+ self.ixnet.setMultiAttribute(helper.transmissionControl,
+ '-type', '{0}'.format(param['traffic_type']),
+ '-duration', '{0}'.format(param['duration']))
+
+ stream_frame_rate_path = helper.frameRate
+ self.ixnet.setMultiAttribute(stream_frame_rate_path, '-rate', param['iload'])
+ if param['outer_l2']['framesPerSecond']:
+ self.ixnet.setMultiAttribute(stream_frame_rate_path,
+ '-type', 'framesPerSecond')
+
+ framesize_data.populate_data(param['outer_l2']['framesize'])
+
+ make_attr_args = framesize_data.make_args('-incrementFrom', '66',
+ '-randomMin', '66',
+ '-quadGaussian', [],
+ '-type', 'weightedPairs',
+ '-presetDistribution', 'cisco',
+ '-incrementTo', '1518')
+
+ self.ixnet.setMultiAttribute(helper.frameSize, *make_attr_args)
+
+ self.ixnet.commit()
+
+ def update_ether_multi_attribute(self, ether, mac_addr):
+ self.ixnet.setMultiAttribute(ether,
+ '-singleValue', mac_addr,
+ '-fieldValue', mac_addr,
+ '-valueType', 'singleValue')
+
+ def update_ether_multi_attributes(self, ether, l2):
+ if "ethernet.header.destinationAddress" in ether:
+ self.update_ether_multi_attribute(ether, str(l2['dstmac']))
+
+ if "ethernet.header.sourceAddress" in ether:
+ self.update_ether_multi_attribute(ether, str(l2['srcmac']))
+
+ def ix_update_ether(self, params):
+ for ti, ep, index in self.iter_over_get_lists('/traffic', 'trafficItem',
+ "configElement", 1):
+ iter1 = (v['outer_l2'] for v in params.values() if str(v['id']) == str(index))
+ try:
+ l2 = next(iter1, {})
+ except KeyError:
+ continue
+
+ for ip, ether, _ in self.iter_over_get_lists(ep, 'stack', 'field'):
+ self.update_ether_multi_attributes(ether, l2)
+
+ self.ixnet.commit()
+
+ def ix_update_udp(self, params):
+ pass
+
+ def ix_update_tcp(self, params):
+ pass
+
+ def ix_start_traffic(self):
+ tis = self.ixnet.getList('/traffic', 'trafficItem')
+ for ti in tis:
+ self.ixnet.execute('generate', [ti])
+ self.ixnet.execute('apply', '/traffic')
+ self.ixnet.execute('start', '/traffic')
+
+ def ix_stop_traffic(self):
+ tis = self.ixnet.getList('/traffic', 'trafficItem')
+ for _ in tis:
+ self.ixnet.execute('stop', '/traffic')
+
+ def build_stats_map(self, view_obj, name_map):
+ return {kl: self.execute_get_column_values(view_obj, kr) for kl, kr in name_map.items()}
+
+ def execute_get_column_values(self, view_obj, name):
+ return self.ixnet.execute('getColumnValues', view_obj, name)
+
+ def ix_get_statistics(self):
+ views = self.ixnet.getList('/statistics', 'view')
+ view_obj = self.find_view_obj("Traffic Item Statistics", views)
+ stats = self.build_stats_map(view_obj, self.STATS_NAME_MAP)
+
+ self.find_view_obj("Port Statistics", views)
+ ports_stats = self.build_stats_map(view_obj, self.PORT_STATS_NAME_MAP)
+
+ views = self.ixnet.getList('/statistics', 'view')
+ view_obj = self.find_view_obj("Flow Statistics", views)
+ stats["latency"] = self.build_stats_map(view_obj, self.LATENCY_NAME_MAP)
+
+ return stats, ports_stats
diff --git a/yardstick/network_services/libs/ixia_libs/IxNet/__init__.py b/yardstick/network_services/libs/ixia_libs/IxNet/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/yardstick/network_services/libs/ixia_libs/IxNet/__init__.py
diff --git a/yardstick/network_services/libs/ixia_libs/__init__.py b/yardstick/network_services/libs/ixia_libs/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/yardstick/network_services/libs/ixia_libs/__init__.py
diff --git a/yardstick/network_services/nfvi/collectd.conf b/yardstick/network_services/nfvi/collectd.conf
index abcf24ded..6d8b73f7f 100644
--- a/yardstick/network_services/nfvi/collectd.conf
+++ b/yardstick/network_services/nfvi/collectd.conf
@@ -15,7 +15,7 @@
Hostname "nsb_stats"
FQDNLookup true
-Interval 5
+Interval {interval}
##############################################################################
# LoadPlugin section #
@@ -23,10 +23,8 @@ Interval 5
# Specify what features to activate. #
##############################################################################
-LoadPlugin amqp
-LoadPlugin cpu
-LoadPlugin intel_rdt
-LoadPlugin memory
+#LoadPlugin syslog
+{loadplugin}
##############################################################################
# Plugin configuration #
@@ -35,6 +33,10 @@ LoadPlugin memory
# ription of those options is available in the collectd.conf(5) manual page. #
##############################################################################
+#<Plugin syslog>
+# LogLevel debug
+#</Plugin>
+
<Plugin amqp>
<Publish "name">
Host "0.0.0.0"
@@ -53,7 +55,7 @@ LoadPlugin memory
<Plugin cpu>
ReportByCpu true
ReportByState true
- ValuesPercentage false
+ ValuesPercentage true
</Plugin>
<Plugin memory>
@@ -61,18 +63,47 @@ LoadPlugin memory
ValuesPercentage false
</Plugin>
-<LoadPlugin intel_rdt>
- Interval 5
-</LoadPlugin>
<Plugin "intel_rdt">
Cores ""
</Plugin>
-<Plugin memcached>
- <Instance "local">
- Host "127.0.0.1"
- Port "11211"
- </Instance>
+<Plugin hugepages>
+ ReportPerNodeHP true
+ ReportRootHP true
+ ValuesPages true
+ ValuesBytes false
+ ValuesPercentage false
+</Plugin>
+
+<Plugin hugepages>
+ ReportPerNodeHP true
+ ReportRootHP true
+ ValuesPages true
+ ValuesBytes false
+ ValuesPercentage false
+</Plugin>
+
+<Plugin dpdkstat>
+ <EAL>
+ Coremask "0x1"
+ MemoryChannels "4"
+ ProcessType "secondary"
+ FilePrefix "rte"
+ </EAL>
+ SharedMemObj "dpdk_collectd_stats_0"
+ EnabledPortMask 0xffff
+ {dpdk_interface}
+</Plugin>
+
+<Plugin virt>
+ Domain "samplevnf"
+</Plugin>
+
+<Plugin ovs_stats>
+ Port "6640"
+ Address "127.0.0.1"
+ Socket "/usr/local/var/run/openvswitch/db.sock"
+ Bridges "br0" "br_ext"
</Plugin>
<Include "/etc/collectd/collectd.conf.d">
diff --git a/yardstick/network_services/nfvi/collectd.sh b/yardstick/network_services/nfvi/collectd.sh
index 7acb40431..8162ec539 100755
--- a/yardstick/network_services/nfvi/collectd.sh
+++ b/yardstick/network_services/nfvi/collectd.sh
@@ -22,8 +22,20 @@ if [ "$(whoami)" != "root" ]; then
exit 1;
fi
+echo "setup proxy..."
+http_proxy=$1
+https_proxy=$2
+if [[ "$http_proxy" != "" ]]; then
+ export http_proxy=$http_proxy
+ export https_proxy=$http_proxy
+fi
+
+if [[ "$https_proxy" != "" ]]; then
+ export https_proxy=$https_proxy
+fi
+
echo "Install required libraries to run collectd..."
-pkg=(git flex bison build-essential pkg-config automake autotools-dev libltdl-dev librabbitmq-dev rabbitmq-server)
+pkg=(git flex bison build-essential pkg-config automake autotools-dev libltdl-dev librabbitmq-dev rabbitmq-server cmake)
for i in "${pkg[@]}"; do
dpkg-query -W --showformat='${Status}\n' "${i}"|grep "install ok installed"
if [ "$?" -eq "1" ]; then
@@ -43,7 +55,6 @@ else
rm -rf intel-cmt-cat >/dev/null
git clone https://github.com/01org/intel-cmt-cat.git
pushd intel-cmt-cat
- git checkout tags/v1.5 -b v1.5
make install PREFIX=/usr
popd
@@ -51,7 +62,59 @@ else
echo "Done."
fi
-which /opt/nsb_bin/collectd/collectd >/dev/null
+ls /usr/lib/libdpdk.so >/dev/null
+if [ $? -eq 0 ]
+then
+ echo "DPDK already installed. Done"
+else
+ pushd .
+
+ echo "Get dpdk and install..."
+ mkdir -p $INSTALL_NSB_BIN
+ rm -rf "$INSTALL_NSB_BIN"/dpdk >/dev/null
+ git clone http://dpdk.org/git/dpdk
+ pushd dpdk
+ mkdir -p /mnt/huge
+ mount -t hugetlbfs nodev /mnt/huge
+ sed -i 's/CONFIG_RTE_BUILD_SHARED_LIB=n/CONFIG_RTE_BUILD_SHARED_LIB=y/g' config/common_base
+ sed -i 's/CONFIG_RTE_EAL_PMD_PATH=""/CONFIG_RTE_EAL_PMD_PATH="\/usr\/lib\/dpdk-pmd\/"/g' config/common_base
+
+ echo "Build dpdk v16.04"
+ make config T=x86_64-native-linuxapp-gcc
+ make
+ sudo make install prefix=/usr
+ mkdir -p /usr/lib/dpdk-pmd
+ find /usr/lib -type f -name 'librte_pmd*' | while read path ; do ln -s $path /usr/lib/dpdk-pmd/`echo $path | grep -o 'librte_.*so'` ; done
+
+ echo "Disable ASLR."
+ echo 0 > /proc/sys/kernel/randomize_va_space
+ make install PREFIX=/usr
+ popd
+
+ popd
+ echo "Done."
+fi
+
+which $INSTALL_NSB_BIN/yajl > /dev/null
+if [ -f "/usr/local/lib/libyajl.so.2.1.1" ]
+then
+ echo "ovs stats libs already installed."
+else
+ echo "installing ovs stats libraries"
+ pushd .
+
+ cd $INSTALL_NSB_BIN
+ git clone https://github.com/lloyd/yajl.git
+ pushd yajl
+ ./configure
+ make
+ make install
+ popd
+
+ popd
+fi
+
+which $INSTALL_NSB_BIN/collectd/collectd >/dev/null
if [ $? -eq 0 ]
then
echo "Collectd already installed. Done"
@@ -62,9 +125,9 @@ else
git clone https://github.com/collectd/collectd.git
pushd collectd
git stash
- git checkout -b collectd 43a4db3b3209f497a0ba408aebf8aee385c6262d
+ git checkout -n nfvi 47c86ace348a1d7a5352a83d10935209f89aa4f5
./build.sh
- ./configure --with-libpqos=/usr/
+ ./configure --with-libpqos=/usr/ --with-libdpdk=/usr --with-libyajl=/usr/local --enable-debug --enable-dpdkstat --enable-virt --enable-ovs_stats
make install > /dev/null
popd
echo "Done."
diff --git a/yardstick/network_services/nfvi/resource.py b/yardstick/network_services/nfvi/resource.py
index 18b0d8952..ce09b6597 100644
--- a/yardstick/network_services/nfvi/resource.py
+++ b/yardstick/network_services/nfvi/resource.py
@@ -14,19 +14,28 @@
""" Resource collection definitions """
from __future__ import absolute_import
+from __future__ import print_function
+import tempfile
import logging
+import os
import os.path
import re
import multiprocessing
+from collections import Sequence
+
from oslo_config import cfg
from yardstick import ssh
from yardstick.network_services.nfvi.collectd import AmqpConsumer
from yardstick.network_services.utils import provision_tool
+LOG = logging.getLogger(__name__)
+
CONF = cfg.CONF
ZMQ_OVS_PORT = 5567
ZMQ_POLLING_TIME = 12000
+LIST_PLUGINS_ENABLED = ["amqp", "cpu", "cpufreq", "intel_rdt", "memory",
+ "hugepages", "dpdkstat", "virt", "ovs_stats"]
class ResourceProfile(object):
@@ -34,16 +43,17 @@ class ResourceProfile(object):
This profile adds a resource at the beginning of the test session
"""
- def __init__(self, vnfd, cores):
+ def __init__(self, mgmt, interfaces=None, cores=None):
self.enable = True
self.connection = None
- self.cores = cores
+ self.cores = cores if isinstance(cores, Sequence) else []
+ self._queue = multiprocessing.Queue()
+ self.amqp_client = None
+ self.interfaces = interfaces if isinstance(interfaces, Sequence) else []
- mgmt_interface = vnfd.get("mgmt-interface")
# why the host or ip?
- self.vnfip = mgmt_interface.get("host", mgmt_interface["ip"])
- self.connection = ssh.SSH.from_node(mgmt_interface,
- overrides={"ip": self.vnfip})
+ self.vnfip = mgmt.get("host", mgmt["ip"])
+ self.connection = ssh.SSH.from_node(mgmt, overrides={"ip": self.vnfip})
self.connection.wait()
@@ -52,81 +62,147 @@ class ResourceProfile(object):
err, pid, _ = self.connection.execute("pgrep -f %s" % process)
return [err == 0, pid]
- def run_collectd_amqp(self, queue):
+ def run_collectd_amqp(self):
""" run amqp consumer to collect the NFVi data """
- amqp = \
- AmqpConsumer('amqp://admin:admin@{}:5672/%2F'.format(self.vnfip),
- queue)
+ amqp_url = 'amqp://admin:admin@{}:5672/%2F'.format(self.vnfip)
+ amqp = AmqpConsumer(amqp_url, self._queue)
try:
amqp.run()
except (AttributeError, RuntimeError, KeyboardInterrupt):
amqp.stop()
@classmethod
- def get_cpu_data(cls, reskey, value):
+ def parse_simple_resource(cls, key, value):
+ return {'/'.join(key): value.split(":")[1]}
+
+ @classmethod
+ def get_cpu_data(cls, key_split, value):
""" Get cpu topology of the host """
pattern = r"-(\d+)"
- if "cpufreq" in reskey[1]:
- match = re.search(pattern, reskey[2], re.MULTILINE)
- metric = reskey[1]
+ if "cpufreq" in key_split[0]:
+ metric = key_split[0]
+ source = key_split[1]
else:
- match = re.search(pattern, reskey[1], re.MULTILINE)
- metric = reskey[2]
+ metric = key_split[1]
+ source = key_split[0]
+
+ match = re.search(pattern, source, re.MULTILINE)
+ if not match:
+ return "error", "Invalid", "", ""
+
+ time, value = value.split(":")
+ return str(match.group(1)), metric, value, time
+
+ @classmethod
+ def parse_hugepages(cls, key, value):
+ return cls.parse_simple_resource(key, value)
- time, val = re.split(":", value)
- if match:
- return [str(match.group(1)), metric, val, time]
+ @classmethod
+ def parse_dpdkstat(cls, key, value):
+ return cls.parse_simple_resource(key, value)
+
+ @classmethod
+ def parse_virt(cls, key, value):
+ return cls.parse_simple_resource(key, value)
- return ["error", "Invalid", ""]
+ @classmethod
+ def parse_ovs_stats(cls, key, value):
+ return cls.parse_simple_resource(key, value)
- def parse_collectd_result(self, metrics, listcores):
+ def parse_collectd_result(self, metrics, core_list):
""" convert collectd data into json"""
- res = {"cpu": {}, "memory": {}}
+ result = {
+ "cpu": {},
+ "memory": {},
+ "hugepages": {},
+ "dpdkstat": {},
+ "virt": {},
+ "ovs_stats": {},
+ }
testcase = ""
for key, value in metrics.items():
- reskey = key.rsplit("/")
- if "cpu" in reskey[1] or "intel_rdt" in reskey[1]:
- cpu_key, name, metric, testcase = \
- self.get_cpu_data(reskey, value)
- if cpu_key in listcores:
- res["cpu"].setdefault(cpu_key, {}).update({name: metric})
- elif "memory" in reskey[1]:
- val = re.split(":", value)[1]
- res["memory"].update({reskey[2]: val})
- res["timestamp"] = testcase
-
- return res
-
- def amqp_collect_nfvi_kpi(self, _queue=multiprocessing.Queue()):
+ key_split = key.split("/")
+ res_key_iter = (key for key in key_split if "nsb_stats" not in key)
+ res_key0 = next(res_key_iter)
+ res_key1 = next(res_key_iter)
+
+ if "cpu" in res_key0 or "intel_rdt" in res_key0:
+ cpu_key, name, metric, testcase = self.get_cpu_data(key_split, value)
+ if cpu_key in core_list:
+ result["cpu"].setdefault(cpu_key, {}).update({name: metric})
+
+ elif "memory" in res_key0:
+ result["memory"].update({res_key1: value.split(":")[0]})
+
+ elif "hugepages" in res_key0:
+ result["hugepages"].update(self.parse_hugepages(key, value))
+
+ elif "dpdkstat" in res_key0:
+ result["dpdkstat"].update(self.parse_dpdkstat(key, value))
+
+ elif "virt" in res_key1:
+ result["virt"].update(self.parse_virt(key, value))
+
+ elif "ovs_stats" in res_key0:
+ result["ovs_stats"].update(self.parse_ovs_stats(key, value))
+
+ result["timestamp"] = testcase
+
+ return result
+
+ def amqp_process_for_nfvi_kpi(self):
""" amqp collect and return nfvi kpis """
- try:
- metric = {}
- amqp_client = \
- multiprocessing.Process(target=self.run_collectd_amqp,
- args=(_queue,))
- amqp_client.start()
- amqp_client.join(7)
- amqp_client.terminate()
-
- while not _queue.empty():
- metric.update(_queue.get())
- except (AttributeError, RuntimeError, TypeError, ValueError):
- logging.debug("Failed to get NFVi stats...")
- msg = {}
- else:
- msg = self.parse_collectd_result(metric, self.cores)
+ if self.amqp_client is None:
+ self.amqp_client = \
+ multiprocessing.Process(target=self.run_collectd_amqp)
+ self.amqp_client.start()
+ def amqp_collect_nfvi_kpi(self):
+ """ amqp collect and return nfvi kpis """
+ metric = {}
+ while not self._queue.empty():
+ metric.update(self._queue.get())
+ msg = self.parse_collectd_result(metric, self.cores)
return msg
- @classmethod
- def _start_collectd(cls, connection, bin_path):
- connection.execute('pkill -9 collectd')
+ def _provide_config_file(self, bin_path, nfvi_cfg, kwargs):
+ with open(os.path.join(bin_path, nfvi_cfg), 'r') as cfg:
+ template = cfg.read()
+ cfg, cfg_content = tempfile.mkstemp()
+ with os.fdopen(cfg, "w+") as cfg:
+ cfg.write(template.format(**kwargs))
+ cfg_file = os.path.join(bin_path, nfvi_cfg)
+ self.connection.put(cfg_content, cfg_file)
+
+ def _prepare_collectd_conf(self, bin_path):
+ """ Prepare collectd conf """
+ loadplugin = "\n".join("LoadPlugin {0}".format(plugin)
+ for plugin in LIST_PLUGINS_ENABLED)
+
+ interfaces = "\n".join("PortName '{0[name]}'".format(interface)
+ for interface in self.interfaces)
+
+ kwargs = {
+ "interval": '25',
+ "loadplugin": loadplugin,
+ "dpdk_interface": interfaces,
+ }
+
+ self._provide_config_file(bin_path, 'collectd.conf', kwargs)
+
+ def _start_collectd(self, connection, bin_path):
+ LOG.debug("Starting collectd to collect NFVi stats")
+ # temp disable
+ return
+ connection.execute('sudo pkill -9 collectd')
collectd = os.path.join(bin_path, "collectd.sh")
provision_tool(connection, collectd)
- provision_tool(connection, os.path.join(bin_path, "collectd.conf"))
+ self._prepare_collectd_conf(bin_path)
# Reset amqp queue
+ LOG.debug("reset and setup amqp to collect data from collectd")
+ connection.execute("sudo rm -rf /var/lib/rabbitmq/mnesia/rabbit*")
connection.execute("sudo service rabbitmq-server start")
connection.execute("sudo rabbitmqctl stop_app")
connection.execute("sudo rabbitmqctl reset")
@@ -134,8 +210,15 @@ class ResourceProfile(object):
connection.execute("sudo service rabbitmq-server restart")
# Run collectd
- connection.execute(collectd)
- connection.execute(os.path.join(bin_path, "collectd", "collectd"))
+
+ http_proxy = os.environ.get('http_proxy', '')
+ https_proxy = os.environ.get('https_proxy', '')
+ connection.execute("sudo %s '%s' '%s'" %
+ (collectd, http_proxy, https_proxy))
+ LOG.debug("Start collectd service.....")
+ connection.execute(
+ "sudo %s" % os.path.join(bin_path, "collectd", "collectd"))
+ LOG.debug("Done")
def initiate_systemagent(self, bin_path):
""" Start system agent for NFVi collection on host """
@@ -145,16 +228,24 @@ class ResourceProfile(object):
def start(self):
""" start nfvi collection """
if self.enable:
- logging.debug("Start NVFi metric collection...")
+ LOG.debug("Start NVFi metric collection...")
def stop(self):
""" stop nfvi collection """
- if self.enable:
- agent = "collectd"
- logging.debug("Stop resource monitor...")
- status, pid = self.check_if_sa_running(agent)
- if status:
- self.connection.execute('kill -9 %s' % pid)
- self.connection.execute('pkill -9 %s' % agent)
- self.connection.execute('service rabbitmq-server stop')
- self.connection.execute("sudo rabbitmqctl stop_app")
+ if not self.enable:
+ return
+
+ agent = "collectd"
+ LOG.debug("Stop resource monitor...")
+
+ if self.amqp_client is not None:
+ self.amqp_client.terminate()
+
+ status, pid = self.check_if_sa_running(agent)
+ if status == 0:
+ return
+
+ self.connection.execute('sudo kill -9 %s' % pid)
+ self.connection.execute('sudo pkill -9 %s' % agent)
+ self.connection.execute('sudo service rabbitmq-server stop')
+ self.connection.execute("sudo rabbitmqctl stop_app")
diff --git a/yardstick/network_services/pipeline.py b/yardstick/network_services/pipeline.py
new file mode 100644
index 000000000..d781ba0cd
--- /dev/null
+++ b/yardstick/network_services/pipeline.py
@@ -0,0 +1,113 @@
+# Copyright (c) 2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+from __future__ import print_function
+import itertools
+
+from six.moves import zip
+
+FIREWALL_ADD_DEFAULT = "p {0} firewall add default 1"
+FIREWALL_ADD_PRIO = """\
+p {0} firewall add priority 1 ipv4 {1} 24 0.0.0.0 0 0 65535 0 65535 6 0xFF port 0"""
+
+FLOW_ADD_QINQ_RULES = """\
+p {0} flow add qinq 128 512 port 0 id 1
+p {0} flow add default 1"""
+
+ACTION_FLOW_BULK = "p {0} action flow bulk /tmp/action_bulk_512.txt"
+ACTION_DSCP_CLASS_COLOR = "p {0} action dscp {1} class {2} color {3}"
+ROUTE_ADD_DEFAULT = "p {0} route add default 1"
+ROUTE_ADD_ETHER_QINQ = 'p {0} route add {1} {2} port 0 ether {3} qinq 0 {4}'
+ROUTE_ADD_ETHER_MPLS = "p {0} route add {1} 21 port 0 ether {2} mpls 0:{3}"
+
+
+class PipelineRules(object):
+
+ def __init__(self, pipeline_id=0):
+ super(PipelineRules, self).__init__()
+ self.rule_list = []
+ self.pipeline_id = pipeline_id
+
+ def __str__(self):
+ return '\n'.join(self.rule_list)
+
+ def get_string(self):
+ return str(self)
+
+ def next_pipeline(self, num=1):
+ self.pipeline_id += num
+
+ def add_newline(self):
+ self.rule_list.append('')
+
+ def add_rule(self, base, *args):
+ self.rule_list.append(base.format(self.pipeline_id, *args))
+
+ def add_firewall_prio(self, ip):
+ self.add_rule(FIREWALL_ADD_PRIO, ip)
+
+ def add_firewall_script(self, ip):
+ ip_addr = ip.split('.')
+ assert len(ip_addr) == 4
+ ip_addr[-1] = '0'
+ for i in range(256):
+ ip_addr[-2] = str(i)
+ ip = '.'.join(ip_addr)
+ self.add_firewall_prio(ip)
+ self.add_rule(FIREWALL_ADD_DEFAULT)
+ self.add_newline()
+
+ def add_flow_classification_script(self):
+ self.add_rule(FLOW_ADD_QINQ_RULES)
+
+ def add_flow_action(self):
+ self.add_rule(ACTION_FLOW_BULK)
+
+ def add_dscp_class_color(self, dscp, color):
+ self.add_rule(ACTION_DSCP_CLASS_COLOR, dscp, dscp % 4, color)
+
+ def add_flow_action2(self):
+ self.add_rule(ACTION_FLOW_BULK)
+ for dscp, color in zip(range(64), itertools.cycle('GYR')):
+ self.add_dscp_class_color(dscp, color)
+
+ def add_route_ether_mpls(self, ip, mac_addr, index):
+ self.add_rule(ROUTE_ADD_ETHER_MPLS, ip, mac_addr, index)
+
+ def add_route_script(self, ip, mac_addr):
+ ip_addr = ip.split('.')
+ assert len(ip_addr) == 4
+ ip_addr[-1] = '0'
+ for index in range(0, 256, 8):
+ ip_addr[-2] = str(index)
+ ip = '.'.join(ip_addr)
+ self.add_route_ether_mpls(ip, mac_addr, index)
+ self.add_rule(ROUTE_ADD_DEFAULT)
+ self.add_newline()
+
+ def add_ether_qinq(self, ip, mask, mac_addr, index):
+ self.add_rule(ROUTE_ADD_ETHER_QINQ, ip, mask, mac_addr, index)
+
+ def add_route_script2(self, ip, mac_addr):
+ ip_addr = ip.split('.')
+ assert len(ip_addr) == 4
+ ip_addr[-1] = '0'
+ mask = 24
+ for i in range(0, 256):
+ ip_addr[-2] = str(i)
+ ip = '.'.join(ip_addr)
+ self.add_ether_qinq(ip, mask, mac_addr, i)
+ self.add_rule(ROUTE_ADD_DEFAULT)
+ self.add_newline()
diff --git a/yardstick/network_services/traffic_profile/http_ixload.py b/yardstick/network_services/traffic_profile/http_ixload.py
new file mode 100644
index 000000000..8a4f97f04
--- /dev/null
+++ b/yardstick/network_services/traffic_profile/http_ixload.py
@@ -0,0 +1,294 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import sys
+import os
+import logging
+
+# ixload uses its own py2. So importing jsonutils fails. So adding below
+# workaround to support call from yardstick
+try:
+ from oslo_serialization import jsonutils
+except ImportError:
+ import json as jsonutils
+
+from yardstick.common.utils import join_non_strings
+from yardstick.common.utils import ErrorClass
+
+try:
+ from IxLoad import IxLoad, StatCollectorUtils
+except ImportError:
+ IxLoad = ErrorClass
+ StatCollectorUtils = ErrorClass
+
+LOG = logging.getLogger(__name__)
+CSV_FILEPATH_NAME = 'IxL_statResults.csv'
+
+STATS_TO_GET = (
+ 'HTTP_Client.csv',
+ 'HTTP_Server.csv',
+ 'L2-3 Stats for Client Ports.csv',
+ 'L2-3 Stats for Server Ports.csv',
+ 'IxLoad Detailed Report.html',
+ 'IxLoad Detailed Report.pdf'
+)
+
+HTTP_CLIENT_STATS = [
+ ["HTTP Client", "TCP Connections Established", "kSum"],
+ ["HTTP Client", "TCP Connection Requests Failed", "kSum"],
+ ["HTTP Client", "HTTP Simulated Users", "kSum"],
+ ["HTTP Client", "HTTP Concurrent Connections", "kSum"],
+ ["HTTP Client", "HTTP Connections", "kSum"],
+ ["HTTP Client", "HTTP Transactions", "kSum"],
+ ["HTTP Client", "HTTP Connection Attempts", "kSum"]
+]
+
+HTTP_SERVER_STATS = [
+ ["HTTP Server", "TCP Connections Established", "kSum"],
+ ["HTTP Server", "TCP Connection Requests Failed", "kSum"]
+]
+
+
+INCOMING_STAT_RECORD_TEMPLATE = """
+=====================================
+INCOMING STAT RECORD >>> %s
+Len = %s
+%s
+%s
+=====================================
+"""
+
+INCOMING_STAT_INTERVAL_TEMPLATE = """
+=====================================
+Incoming stats: Time interval: %s
+Incoming stats: Time interval: %s
+=====================================
+"""
+
+
+class IXLOADHttpTest(object):
+
+ def __init__(self, test_input):
+ self.test_input = jsonutils.loads(test_input)
+ self.parse_run_test()
+ self.ix_load = None
+ self.stat_utils = None
+ self.remote_server = None
+ self.config_file = None
+ self.results_on_windows = None
+ self.result_dir = None
+ self.chassis = None
+ self.card = None
+ self.ports_to_reassign = None
+
+ @staticmethod
+ def format_ports_for_reassignment(ports):
+ formatted = [join_non_strings(';', p) for p in ports if len(p) == 3]
+ LOG.debug('for client ports:%s', os.linesep.join(formatted))
+ return formatted
+
+ def reassign_ports(self, test, repository, ports_to_reassign):
+ LOG.debug('ReassignPorts: %s %s', test, repository)
+
+ chassis_chain = repository.cget('chassisChain')
+ LOG.debug('chassischain: %s', chassis_chain)
+ client_ports = ports_to_reassign[0::2]
+ server_ports = ports_to_reassign[1::2]
+
+ client_ports = self.format_ports_for_reassignment(client_ports)
+ LOG.debug('Reassigning client ports: %s', client_ports)
+ server_ports = self.format_ports_for_reassignment(server_ports)
+ LOG.debug('Reassigning server ports: %s', server_ports)
+ ports_to_set = client_ports + server_ports
+
+ try:
+ LOG.debug('Reassigning ports: %s', ports_to_set)
+ test.setPorts(ports_to_set)
+ except Exception:
+ LOG.error('Error: Could not remap port assignment for: %s',
+ ports_to_set)
+ self.ix_load.delete(repository)
+ self.ix_load.disconnect()
+ raise
+
+ @staticmethod
+ def stat_collector(*args):
+ LOG.debug(INCOMING_STAT_RECORD_TEMPLATE, args, len(args), args[0], args[1])
+
+ @staticmethod
+ def IxL_StatCollectorCommand(*args):
+ stats = args[1][3]
+ timestamp = args[1][1]
+ LOG.debug(INCOMING_STAT_INTERVAL_TEMPLATE, timestamp, stats)
+
+ @staticmethod
+ def set_results_dir(test_controller, results_on_windows):
+ """
+ If the folder doesn't exists on the Windows Client PC,
+ IxLoad will automatically create it.
+ """
+ try:
+ test_controller.setResultDir(results_on_windows)
+ except Exception:
+ LOG.error('Error creating results dir on Win: %s',
+ results_on_windows)
+ raise
+
+ def load_config_file(self, config_file):
+ try:
+ LOG.debug(config_file)
+ repository = self.ix_load.new("ixRepository", name=config_file)
+ return repository
+ except Exception:
+ LOG.error('Error: IxLoad config file not found: %s', config_file)
+ raise
+
+ def start_http_test(self):
+ self.ix_load = IxLoad()
+
+ LOG.debug('--- ixLoad obj: %s', self.ix_load)
+ try:
+ self.ix_load.connect(self.remote_server)
+ except Exception:
+ raise
+
+ log_tag = "IxLoad-api"
+ log_name = "reprun"
+ logger = self.ix_load.new("ixLogger", log_tag, 1)
+ log_engine = logger.getEngine()
+ log_engine.setLevels(self.ix_load.ixLogger.kLevelDebug,
+ self.ix_load.ixLogger.kLevelInfo)
+ log_engine.setFile(log_name, 2, 256, 1)
+
+ # Initialize stat collection utilities
+ self.stat_utils = StatCollectorUtils()
+
+ test_controller = self.ix_load.new("ixTestController", outputDir=1)
+
+ repository = self.load_config_file(self.config_file)
+
+ # Get the first test on the testList
+ test_name = repository.testList[0].cget("name")
+ test = repository.testList.getItem(test_name)
+
+ self.set_results_dir(test_controller, self.results_on_windows)
+
+ test.config(statsRequired=1, enableResetPorts=1, csvInterval=2,
+ enableForceOwnership=True)
+
+ # ---- Remap ports ----
+ try:
+ self.reassign_ports(test, repository, self.ports_to_reassign)
+ except Exception:
+ LOG.exception("Exception occurred during reassign_ports")
+
+ # -----------------------------------------------------------------------
+ # Set up stat Collection
+ # -----------------------------------------------------------------------
+ test_server_handle = test_controller.getTestServerHandle()
+ self.stat_utils.Initialize(test_server_handle)
+
+ # Clear any stats that may have been registered previously
+ self.stat_utils.ClearStats()
+
+ # Define the stats we would like to collect
+ self.stat_utils.AddStat(caption="Watch_Stat_1",
+ statSourceType="HTTP Client",
+ statName="TCP Connections Established",
+ aggregationType="kSum",
+ filterList={})
+
+ self.stat_utils.AddStat(caption="Watch_Stat_2",
+ statSourceType="HTTP Client",
+ statName="TCP Connection Requests Failed",
+ aggregationType="kSum",
+ filterList={})
+
+ self.stat_utils.AddStat(caption="Watch_Stat_3",
+ statSourceType="HTTP Server",
+ statName="TCP Connections Established",
+ aggregationType="kSum",
+ filterList={})
+
+ self.stat_utils.AddStat(caption="Watch_Stat_4",
+ statSourceType="HTTP Server",
+ statName="TCP Connection Requests Failed",
+ aggregationType="kSum",
+ filterList={})
+
+ self.stat_utils.StartCollector(self.IxL_StatCollectorCommand)
+
+ test_controller.run(test)
+ self.ix_load.waitForTestFinish()
+
+ test_controller.releaseConfigWaitFinish()
+
+ # Stop the collector (running in the tcl event loop)
+ self.stat_utils.StopCollector()
+
+ # Cleanup
+ test_controller.generateReport(detailedReport=1, format="PDF;HTML")
+ test_controller.releaseConfigWaitFinish()
+
+ self.ix_load.delete(test)
+ self.ix_load.delete(test_controller)
+ self.ix_load.delete(logger)
+ self.ix_load.delete(log_engine)
+
+ LOG.debug('Retrieving CSV stats from Windows Client PC ...')
+ for stat_file in STATS_TO_GET:
+ enhanced_stat_file = stat_file.replace('-', '')
+ enhanced_stat_file = enhanced_stat_file.replace(' ', '_')
+ enhanced_stat_file = enhanced_stat_file.replace('__', '_')
+
+ LOG.debug('Getting csv stat file: %s', stat_file)
+ src_file = os.path.join(self.results_on_windows, stat_file)
+ dst_file = os.path.join(self.result_dir, '_'.join(['ixLoad', enhanced_stat_file]))
+ self.ix_load.retrieveFileCopy(src_file, dst_file)
+
+ self.ix_load.disconnect()
+
+ def parse_run_test(self):
+ self.remote_server = self.test_input["remote_server"]
+ LOG.debug("remote tcl server: %s", self.remote_server)
+
+ self.config_file = self.test_input["ixload_cfg"]
+ LOG.debug("ixload config: %s", self.remote_server)
+
+ self.results_on_windows = 'C:/Results'
+ self.result_dir = self.test_input["result_dir"]
+ self.chassis = self.test_input["ixia_chassis"]
+ LOG.debug("remote ixia chassis: %s", self.chassis)
+
+ self.card = self.test_input["IXIA"]["card"]
+ self.ports_to_reassign = [
+ [self.chassis, self.card, port] for port in
+ self.test_input["IXIA"]["ports"]
+ ]
+
+ LOG.debug("Ports to be reassigned: %s", self.ports_to_reassign)
+
+
+def main(args):
+ # Get the args from cmdline and parse and run the test
+ test_input = "".join(args[1:])
+ if test_input:
+ ixload_obj = IXLOADHttpTest(test_input)
+ ixload_obj.start_http_test()
+
+if __name__ == '__main__':
+ main(sys.argv)
diff --git a/yardstick/network_services/traffic_profile/ixia_rfc2544.py b/yardstick/network_services/traffic_profile/ixia_rfc2544.py
new file mode 100644
index 000000000..5ba00180b
--- /dev/null
+++ b/yardstick/network_services/traffic_profile/ixia_rfc2544.py
@@ -0,0 +1,155 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import logging
+import json
+
+from yardstick.network_services.traffic_profile.traffic_profile import \
+ TrexProfile
+
+LOG = logging.getLogger(__name__)
+
+
+class IXIARFC2544Profile(TrexProfile):
+ def _get_ixia_traffic_profile(self, profile_data, mac={},
+ xfile=None, static_traffic={}):
+ result = {}
+ if xfile:
+ with open(xfile, 'r') as stream:
+ try:
+ static_traffic = json.load(stream)
+ except Exception as exc:
+ LOG.debug(exc)
+
+ for traffickey, trafficvalue in static_traffic.items():
+ traffic = static_traffic[traffickey]
+ # outer_l2
+ index = 0
+ for key, value in profile_data[traffickey].items():
+ framesize = value['outer_l2']['framesize']
+ traffic['outer_l2']['framesize'] = framesize
+ traffic['framesPerSecond'] = True
+ traffic['bidir'] = False
+ traffic['outer_l2']['srcmac'] = \
+ mac["src_mac_{}".format(traffic['id'])]
+ traffic['outer_l2']['dstmac'] = \
+ mac["dst_mac_{}".format(traffic['id'])]
+ # outer_l3
+ if "outer_l3v6" in list(value.keys()):
+ traffic['outer_l3'] = value['outer_l3v6']
+ srcip4 = value['outer_l3v6']['srcip6']
+ traffic['outer_l3']['srcip4'] = srcip4.split("-")[0]
+ dstip4 = value['outer_l3v6']['dstip6']
+ traffic['outer_l3']['dstip4'] = dstip4.split("-")[0]
+ else:
+ traffic['outer_l3'] = value['outer_l3v4']
+ srcip4 = value['outer_l3v4']['srcip4']
+ traffic['outer_l3']['srcip4'] = srcip4.split("-")[0]
+ dstip4 = value['outer_l3v4']['dstip4']
+ traffic['outer_l3']['dstip4'] = dstip4.split("-")[0]
+
+ traffic['outer_l3']['type'] = key
+ # outer_l4
+ traffic['outer_l4'] = value['outer_l4']
+ index = index + 1
+ result.update({traffickey: traffic})
+
+ return result
+
+ def _ixia_traffic_generate(self, traffic_generator, traffic, ixia_obj):
+ for key, value in traffic.items():
+ if "public" in key or "private" in key:
+ traffic[key]["iload"] = str(self.rate)
+ ixia_obj.ix_update_frame(traffic)
+ ixia_obj.ix_update_ether(traffic)
+ # ixia_obj.ix_update_ipv4(traffic)
+ ixia_obj.ix_start_traffic()
+ self.tmp_drop = 0
+ self.tmp_throughput = 0
+
+ def execute(self, traffic_generator, ixia_obj, mac={}, xfile=None):
+ if self.first_run:
+ self.full_profile = {}
+ self.pg_id = 0
+ self.profile = 'private_1'
+ for key, value in self.params.items():
+ if "private" in key or "public" in key:
+ self.profile_data = self.params[key]
+ self.get_streams(self.profile_data)
+ self.full_profile.update({key: self.profile_data})
+ traffic = \
+ self._get_ixia_traffic_profile(self.full_profile, mac, xfile)
+ self.max_rate = self.rate
+ self.min_rate = 0
+ self.get_multiplier()
+ self._ixia_traffic_generate(traffic_generator, traffic, ixia_obj)
+
+ def get_multiplier(self):
+ self.rate = round((self.max_rate + self.min_rate) / 2.0, 2)
+ multiplier = round(self.rate / self.pps, 2)
+ return str(multiplier)
+
+ def start_ixia_latency(self, traffic_generator, ixia_obj,
+ mac={}, xfile=None):
+ traffic = self._get_ixia_traffic_profile(self.full_profile, mac)
+ self._ixia_traffic_generate(traffic_generator, traffic,
+ ixia_obj, xfile)
+
+ def get_drop_percentage(self, traffic_generator, samples, tol_min,
+ tolerance, ixia_obj, mac={}, xfile=None):
+ status = 'Running'
+ drop_percent = 100
+ in_packets = sum([samples[iface]['in_packets'] for iface in samples])
+ out_packets = sum([samples[iface]['out_packets'] for iface in samples])
+ rx_throughput = \
+ sum([samples[iface]['RxThroughput'] for iface in samples])
+ tx_throughput = \
+ sum([samples[iface]['TxThroughput'] for iface in samples])
+ packet_drop = abs(out_packets - in_packets)
+ try:
+ drop_percent = round((packet_drop / float(out_packets)) * 100, 2)
+ except ZeroDivisionError:
+ LOG.info('No traffic is flowing')
+ samples['TxThroughput'] = round(tx_throughput / 1.0, 2)
+ samples['RxThroughput'] = round(rx_throughput / 1.0, 2)
+ samples['CurrentDropPercentage'] = drop_percent
+ samples['Throughput'] = self.tmp_throughput
+ samples['DropPercentage'] = self.tmp_drop
+ if drop_percent > tolerance and self.tmp_throughput == 0:
+ samples['Throughput'] = round(rx_throughput / 1.0, 2)
+ samples['DropPercentage'] = drop_percent
+ if self.first_run:
+ max_supported_rate = out_packets / 30.0
+ self.rate = max_supported_rate
+ self.first_run = False
+ if drop_percent <= tolerance:
+ status = 'Completed'
+ if drop_percent > tolerance:
+ self.max_rate = self.rate
+ elif drop_percent < tol_min:
+ self.min_rate = self.rate
+ if drop_percent >= self.tmp_drop:
+ self.tmp_drop = drop_percent
+ self.tmp_throughput = round((rx_throughput / 1.0), 2)
+ samples['Throughput'] = round(rx_throughput / 1.0, 2)
+ samples['DropPercentage'] = drop_percent
+ else:
+ samples['Throughput'] = round(rx_throughput / 1.0, 2)
+ samples['DropPercentage'] = drop_percent
+ return status, samples
+ self.get_multiplier()
+ traffic = self._get_ixia_traffic_profile(self.full_profile, mac, xfile)
+ self._ixia_traffic_generate(traffic_generator, traffic, ixia_obj)
+ return status, samples
diff --git a/yardstick/network_services/traffic_profile/rfc2544.py b/yardstick/network_services/traffic_profile/rfc2544.py
index 99964d329..b07bc9d5a 100644
--- a/yardstick/network_services/traffic_profile/rfc2544.py
+++ b/yardstick/network_services/traffic_profile/rfc2544.py
@@ -17,6 +17,10 @@ from __future__ import absolute_import
from __future__ import division
import logging
+from stl.trex_stl_lib.trex_stl_client import STLStream
+from stl.trex_stl_lib.trex_stl_streams import STLFlowLatencyStats
+from stl.trex_stl_lib.trex_stl_streams import STLTXCont
+
from yardstick.network_services.traffic_profile.traffic_profile \
import TrexProfile
@@ -28,79 +32,175 @@ class RFC2544Profile(TrexProfile):
def __init__(self, traffic_generator):
super(RFC2544Profile, self).__init__(traffic_generator)
+ self.generator = None
self.max_rate = None
self.min_rate = None
+ self.ports = None
self.rate = 100
- self.tmp_drop = None
- self.tmp_throughput = None
- self.profile_data = None
-
- def execute(self, traffic_generator):
- ''' Generate the stream and run traffic on the given ports '''
- if self.first_run:
- self.profile_data = self.params.get('private', '')
- ports = [traffic_generator.my_ports[0]]
- traffic_generator.client.add_streams(self.get_streams(),
- ports=ports[0])
- profile_data = self.params.get('public', '')
- if profile_data:
- self.profile_data = profile_data
- ports.append(traffic_generator.my_ports[1])
- traffic_generator.client.add_streams(self.get_streams(),
- ports=ports[1])
+ self.drop_percent_at_max_tx = None
+ self.throughput_max = None
- self.max_rate = self.rate
- self.min_rate = 0
- traffic_generator.client.start(ports=ports,
- mult=self.get_multiplier(),
- duration=30, force=True)
- self.tmp_drop = 0
- self.tmp_throughput = 0
+ def register_generator(self, generator):
+ self.generator = generator
+
+ def execute(self, traffic_generator=None):
+ """ Generate the stream and run traffic on the given ports """
+ if traffic_generator is not None and self.generator is None:
+ self.generator = traffic_generator
+
+ if self.ports is not None:
+ return
+
+ self.ports = []
+ priv_ports = self.generator.priv_ports
+ pub_ports = self.generator.pub_ports
+ # start from 1 for private_1, public_1, etc.
+ for index, (priv_port, pub_port) in enumerate(zip(priv_ports, pub_ports), 1):
+ profile_data = self.params.get('private_{}'.format(index), '')
+ self.ports.append(priv_port)
+ # pass profile_data directly, don't use self.profile_data
+ self.generator.client.add_streams(self.get_streams(profile_data), ports=priv_port)
+ profile_data = self.params.get('public_{}'.format(index), '')
+ # correlated traffic doesn't use public traffic?
+ if not profile_data or self.generator.rfc2544_helper.correlated_traffic:
+ continue
+ # just get the pub_port
+ self.ports.append(pub_port)
+ self.generator.client.add_streams(self.get_streams(profile_data), ports=pub_port)
+
+ self.max_rate = self.rate
+ self.min_rate = 0
+ self.generator.client.start(ports=self.ports, mult=self.get_multiplier(),
+ duration=30, force=True)
+ self.drop_percent_at_max_tx = 0
+ self.throughput_max = 0
def get_multiplier(self):
- ''' Get the rate at which next iternation to run '''
+ """ Get the rate at which next iteration to run """
self.rate = round((self.max_rate + self.min_rate) / 2.0, 2)
multiplier = round(self.rate / self.pps, 2)
return str(multiplier)
- def get_drop_percentage(self, traffic_generator,
- samples, tol_min, tolerance):
- ''' Calculate the drop percentage and run the traffic '''
- in_packets = sum([samples[iface]['in_packets'] for iface in samples])
- out_packets = sum([samples[iface]['out_packets'] for iface in samples])
+ def get_drop_percentage(self, generator=None):
+ """ Calculate the drop percentage and run the traffic """
+ if generator is None:
+ generator = self.generator
+ run_duration = self.generator.RUN_DURATION
+ samples = self.generator.generate_samples()
+
+ in_packets = sum([value['in_packets'] for value in samples.values()])
+ out_packets = sum([value['out_packets'] for value in samples.values()])
+
packet_drop = abs(out_packets - in_packets)
drop_percent = 100.0
try:
- drop_percent = round((packet_drop / float(out_packets)) * 100, 2)
+ drop_percent = round((packet_drop / float(out_packets)) * 100, 5)
except ZeroDivisionError:
LOGGING.info('No traffic is flowing')
- samples['TxThroughput'] = out_packets / 30
- samples['RxThroughput'] = in_packets / 30
- samples['CurrentDropPercentage'] = drop_percent
- samples['Throughput'] = self.tmp_throughput
- samples['DropPercentage'] = self.tmp_drop
- if drop_percent > tolerance and self.tmp_throughput == 0:
- samples['Throughput'] = (in_packets / 30)
- samples['DropPercentage'] = drop_percent
- if self.first_run:
- max_supported_rate = out_packets / 30
- self.rate = max_supported_rate
+
+ # TODO(esm): RFC2544 doesn't tolerate packet loss, why do we?
+ tolerance_low = generator.rfc2544_helper.tolerance_low
+ tolerance_high = generator.rfc2544_helper.tolerance_high
+
+ tx_rate = out_packets / run_duration
+ rx_rate = in_packets / run_duration
+
+ throughput_max = self.throughput_max
+ drop_percent_at_max_tx = self.drop_percent_at_max_tx
+
+ if self.drop_percent_at_max_tx is None:
+ self.rate = tx_rate
self.first_run = False
- if drop_percent > tolerance:
+
+ if drop_percent > tolerance_high:
+ # TODO(esm): why don't we discard results that are out of tolerance?
self.max_rate = self.rate
- elif drop_percent < tol_min:
+ if throughput_max == 0:
+ throughput_max = rx_rate
+ drop_percent_at_max_tx = drop_percent
+
+ elif drop_percent >= tolerance_low:
+ # TODO(esm): why do we update the samples dict in this case
+ # and not update our tracking values?
+ throughput_max = rx_rate
+ drop_percent_at_max_tx = drop_percent
+
+ elif drop_percent >= self.drop_percent_at_max_tx:
+ # TODO(esm): why don't we discard results that are out of tolerance?
self.min_rate = self.rate
- if drop_percent >= self.tmp_drop:
- self.tmp_drop = drop_percent
- self.tmp_throughput = (in_packets / 30)
- samples['Throughput'] = (in_packets / 30)
- samples['DropPercentage'] = drop_percent
+ self.drop_percent_at_max_tx = drop_percent_at_max_tx = drop_percent
+ self.throughput_max = throughput_max = rx_rate
+
else:
- samples['Throughput'] = (in_packets / 30)
- samples['DropPercentage'] = drop_percent
+ # TODO(esm): why don't we discard results that are out of tolerance?
+ self.min_rate = self.rate
+
+ generator.clear_client_stats()
+ generator.start_client(mult=self.get_multiplier(),
+ duration=run_duration, force=True)
+
+ # if correlated traffic update the Throughput
+ if generator.rfc2544_helper.correlated_traffic:
+ throughput_max *= 2
+
+ samples.update({
+ 'TxThroughput': tx_rate,
+ 'RxThroughput': rx_rate,
+ 'CurrentDropPercentage': drop_percent,
+ 'Throughput': throughput_max,
+ 'DropPercentage': drop_percent_at_max_tx,
+ })
- traffic_generator.client.clear_stats(ports=traffic_generator.my_ports)
- traffic_generator.client.start(ports=traffic_generator.my_ports,
- mult=self.get_multiplier(),
- duration=30, force=True)
return samples
+
+ def execute_latency(self, generator=None, samples=None):
+ if generator is None:
+ generator = self.generator
+
+ if samples is None:
+ samples = generator.generate_samples()
+
+ self.pps, multiplier = self.calculate_pps(samples)
+ self.ports = []
+ self.pg_id = self.params['traffic_profile'].get('pg_id', 1)
+ priv_ports = generator.priv_ports
+ pub_ports = generator.pub_ports
+ for index, (priv_port, pub_port) in enumerate(zip(priv_ports, pub_ports), 1):
+ profile_data = self.params.get('private_{}'.format(index), '')
+ self.ports.append(priv_port)
+ generator.client.add_streams(self.get_streams(profile_data),
+ ports=priv_port)
+
+ profile_data = self.params.get('public_{}'.format(index), '')
+ if not profile_data or generator.correlated_traffic:
+ continue
+
+ pub_port = generator.pub_ports[index]
+ self.ports.append(pub_port)
+ generator.client.add_streams(self.get_streams(profile_data),
+ ports=pub_port)
+
+ generator.start_client(ports=self.ports, mult=str(multiplier),
+ duration=120, force=True)
+ self.first_run = False
+
+ def calculate_pps(self, samples):
+ pps = round(samples['Throughput'] / 2, 2)
+ multiplier = round(self.rate / self.pps, 2)
+ return pps, multiplier
+
+ def create_single_stream(self, packet_size, pps, isg=0):
+ packet = self._create_single_packet(packet_size)
+ if pps:
+ stl_mode = STLTXCont(pps=pps)
+ else:
+ stl_mode = STLTXCont(pps=self.pps)
+ if self.pg_id:
+ LOGGING.debug("pg_id: %s", self.pg_id)
+ stl_flow_stats = STLFlowLatencyStats(pg_id=self.pg_id)
+ stream = STLStream(isg=isg, packet=packet, mode=stl_mode,
+ flow_stats=stl_flow_stats)
+ self.pg_id += 1
+ else:
+ stream = STLStream(isg=isg, packet=packet, mode=stl_mode)
+ return stream
diff --git a/yardstick/network_services/traffic_profile/traffic_profile.py b/yardstick/network_services/traffic_profile/traffic_profile.py
index 156cc6644..3e1f8d89f 100644
--- a/yardstick/network_services/traffic_profile/traffic_profile.py
+++ b/yardstick/network_services/traffic_profile/traffic_profile.py
@@ -399,16 +399,19 @@ class TrexProfile(TrafficProfile):
logging.debug("Imax: %s rate: %s", imix_count, self.rate)
return imix_count
- def get_streams(self):
- """ generate trex stream """
+ def get_streams(self, profile_data):
+ """ generate trex stream
+ :param profile_data:
+ :type profile_data:
+ """
self.streams = []
self.pps = self.params['traffic_profile'].get('frame_rate', 100)
- for packet_name in self.profile_data:
- outer_l2 = self.profile_data[packet_name].get('outer_l2')
+ for packet_name in profile_data:
+ outer_l2 = profile_data[packet_name].get('outer_l2')
imix_data = self.generate_imix_data(outer_l2)
if not imix_data:
imix_data = {64: self.pps}
- self.generate_vm(self.profile_data[packet_name])
+ self.generate_vm(profile_data[packet_name])
for size in imix_data:
self._generate_streams(size, imix_data[size])
self._generate_profile()
diff --git a/yardstick/network_services/utils.py b/yardstick/network_services/utils.py
index cb71a6029..0264bbc1c 100644
--- a/yardstick/network_services/utils.py
+++ b/yardstick/network_services/utils.py
@@ -45,17 +45,19 @@ def get_nsb_option(option, default=None):
return default
-def provision_tool(connection, tool_path):
+def provision_tool(connection, tool_path, tool_file=None):
"""
verify if the tool path exits on the node,
if not push the local binary to remote node
:return - Tool path
"""
+ if tool_file:
+ tool_path = os.path.join(tool_path, tool_file)
bin_path = get_nsb_option("bin_path")
- exit_status, stdout = connection.execute("which %s" % tool_path)[:2]
+ exit_status = connection.execute("which %s > /dev/null 2>&1" % tool_path)[0]
if exit_status == 0:
- return encodeutils.safe_decode(stdout, incoming='utf-8').rstrip()
+ return encodeutils.safe_decode(tool_path, incoming='utf-8').rstrip()
logging.warning("%s not found on %s, will try to copy from localhost",
tool_path, connection.host)
diff --git a/yardstick/network_services/vnf_generic/vnf/acl_vnf.py b/yardstick/network_services/vnf_generic/vnf/acl_vnf.py
new file mode 100644
index 000000000..5f3c8a0cd
--- /dev/null
+++ b/yardstick/network_services/vnf_generic/vnf/acl_vnf.py
@@ -0,0 +1,72 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+from __future__ import print_function
+import logging
+
+from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper
+from yardstick.network_services.yang_model import YangModel
+
+LOG = logging.getLogger(__name__)
+
+# ACL should work the same on all systems, we can provide the binary
+ACL_PIPELINE_COMMAND = \
+ 'sudo {tool_path} -p {ports_len_hex} -f {cfg_file} -s {script}'
+
+ACL_COLLECT_KPI = r"""\
+ACL TOTAL:[^p]+pkts_processed"?:\s(\d+),[^p]+pkts_drop"?:\s(\d+),[^p]+pkts_received"?:\s(\d+),"""
+
+
+class AclApproxSetupEnvSetupEnvHelper(DpdkVnfSetupEnvHelper):
+
+ APP_NAME = "vACL"
+ CFG_CONFIG = "/tmp/acl_config"
+ CFG_SCRIPT = "/tmp/acl_script"
+ PIPELINE_COMMAND = ACL_PIPELINE_COMMAND
+ HW_DEFAULT_CORE = 2
+ SW_DEFAULT_CORE = 5
+ DEFAULT_CONFIG_TPL_CFG = "acl.cfg"
+ VNF_TYPE = "ACL"
+
+
+class AclApproxVnf(SampleVNF):
+
+ APP_NAME = "vACL"
+ APP_WORD = 'acl'
+ COLLECT_KPI = ACL_COLLECT_KPI
+
+ COLLECT_MAP = {
+ 'packets_in': 3,
+ 'packets_fwd': 1,
+ 'packets_dropped': 2,
+ }
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if setup_env_helper_type is None:
+ setup_env_helper_type = AclApproxSetupEnvSetupEnvHelper
+
+ super(AclApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type)
+ self.acl_rules = None
+
+ def scale(self, flavor=""):
+ raise NotImplementedError
+
+ def _start_vnf(self):
+ yang_model_path = find_relative_file(self.scenario_helper.options['rules'],
+ self.scenario_helper.task_path)
+ yang_model = YangModel(yang_model_path)
+ self.acl_rules = yang_model.get_rules()
+ super(AclApproxVnf, self)._start_vnf()
diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py
index 2df6037f3..955f9f03d 100644
--- a/yardstick/network_services/vnf_generic/vnf/base.py
+++ b/yardstick/network_services/vnf_generic/vnf/base.py
@@ -15,10 +15,6 @@
from __future__ import absolute_import
import logging
-import ipaddress
-import six
-
-from yardstick.network_services.utils import get_nsb_option
LOG = logging.getLogger(__name__)
@@ -61,192 +57,69 @@ class QueueFileWrapper(object):
self.q_out.get()
-class GenericVNF(object):
+class VnfdHelper(dict):
+
+ @property
+ def mgmt_interface(self):
+ return self["mgmt-interface"]
+
+ @property
+ def vdu(self):
+ return self['vdu']
+
+ @property
+ def vdu0(self):
+ return self.vdu[0]
+
+ @property
+ def interfaces(self):
+ return self.vdu0['external-interface']
+
+ @property
+ def kpi(self):
+ return self['benchmark']['kpi']
+
+ def find_virtual_interface(self, **kwargs):
+ key, value = next(iter(kwargs.items()))
+ for interface in self.interfaces:
+ virtual_intf = interface["virtual-interface"]
+ if virtual_intf[key] == value:
+ return interface
+
+ def find_interface(self, **kwargs):
+ key, value = next(iter(kwargs.items()))
+ for interface in self.interfaces:
+ if interface[key] == value:
+ return interface
+
+
+class VNFObject(object):
+
+ def __init__(self, name, vnfd):
+ super(VNFObject, self).__init__()
+ self.name = name
+ self.vnfd_helper = VnfdHelper(vnfd) # fixme: parse this into a structure
+
+
+class GenericVNF(VNFObject):
+
""" Class providing file-like API for generic VNF implementation """
- def __init__(self, vnfd):
- super(GenericVNF, self).__init__()
- self.vnfd = vnfd # fixme: parse this into a structure
+ def __init__(self, name, vnfd):
+ super(GenericVNF, self).__init__(name, vnfd)
# List of statistics we can obtain from this VNF
# - ETSI MANO 6.3.1.1 monitoring_parameter
- self.kpi = self._get_kpi_definition(vnfd)
+ self.kpi = self._get_kpi_definition()
# Standard dictionary containing params like thread no, buffer size etc
self.config = {}
self.runs_traffic = False
- self.name = "vnf__1" # name in topology file
- self.bin_path = get_nsb_option("bin_path", "")
- @classmethod
- def _get_kpi_definition(cls, vnfd):
+ def _get_kpi_definition(self):
""" Get list of KPIs defined in VNFD
:param vnfd:
:return: list of KPIs, e.g. ['throughput', 'latency']
"""
- return vnfd['benchmark']['kpi']
-
- @classmethod
- def get_ip_version(cls, ip_addr):
- """ get ip address version v6 or v4 """
- try:
- address = ipaddress.ip_address(six.text_type(ip_addr))
- except ValueError:
- LOG.error(ip_addr, " is not valid")
- return
- else:
- return address.version
-
- def _ip_to_hex(self, ip_addr):
- ip_x = ip_addr
- if self.get_ip_version(ip_addr) == 4:
- ip_to_convert = ip_addr.split(".")
- ip_octect = [int(octect) for octect in ip_to_convert]
- ip_x = "{0[0]:02X}{0[1]:02X}{0[2]:02X}{0[3]:02X}".format(ip_octect)
- return ip_x
-
- def _get_dpdk_port_num(self, name):
- for intf in self.vnfd['vdu'][0]['external-interface']:
- if name == intf['name']:
- return intf['virtual-interface']['dpdk_port_num']
-
- def _append_routes(self, ip_pipeline_cfg):
- if 'routing_table' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['routing_table']
-
- where = ip_pipeline_cfg.find("arp_route_tbl")
- link = ip_pipeline_cfg[:where]
- route_add = ip_pipeline_cfg[where:]
-
- tmp = route_add.find('\n')
- route_add = route_add[tmp:]
-
- cmds = "arp_route_tbl ="
-
- for route in routing_table:
- net = self._ip_to_hex(route['network'])
- net_nm = self._ip_to_hex(route['netmask'])
- net_gw = self._ip_to_hex(route['gateway'])
- port = self._get_dpdk_port_num(route['if'])
- cmd = \
- " ({port0_local_ip_hex},{port0_netmask_hex},{dpdk_port},"\
- "{port1_local_ip_hex})".format(port0_local_ip_hex=net,
- port0_netmask_hex=net_nm,
- dpdk_port=port,
- port1_local_ip_hex=net_gw)
- cmds += cmd
-
- cmds += '\n'
- ip_pipeline_cfg = link + cmds + route_add
-
- return ip_pipeline_cfg
-
- def _append_nd_routes(self, ip_pipeline_cfg):
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- where = ip_pipeline_cfg.find("nd_route_tbl")
- link = ip_pipeline_cfg[:where]
- route_nd = ip_pipeline_cfg[where:]
-
- tmp = route_nd.find('\n')
- route_nd = route_nd[tmp:]
-
- cmds = "nd_route_tbl ="
-
- for route in routing_table:
- net = route['network']
- net_nm = route['netmask']
- net_gw = route['gateway']
- port = self._get_dpdk_port_num(route['if'])
- cmd = \
- " ({port0_local_ip_hex},{port0_netmask_hex},{dpdk_port},"\
- "{port1_local_ip_hex})".format(port0_local_ip_hex=net,
- port0_netmask_hex=net_nm,
- dpdk_port=port,
- port1_local_ip_hex=net_gw)
- cmds += cmd
-
- cmds += '\n'
- ip_pipeline_cfg = link + cmds + route_nd
-
- return ip_pipeline_cfg
-
- def _get_port0localip6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 1:
- return_value = route['network']
- LOG.info("_get_port0localip6 : %s", return_value)
- return return_value
-
- def _get_port1localip6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 2:
- return_value = route['network']
- LOG.info("_get_port1localip6 : %s", return_value)
- return return_value
-
- def _get_port0prefixlen6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 1:
- return_value = route['netmask']
- LOG.info("_get_port0prefixlen6 : %s", return_value)
- return return_value
-
- def _get_port1prefixlen6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 2:
- return_value = route['netmask']
- LOG.info("_get_port1prefixlen6 : %s", return_value)
- return return_value
-
- def _get_port0gateway6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 1:
- return_value = route['network']
- LOG.info("_get_port0gateway6 : %s", return_value)
- return return_value
-
- def _get_port1gateway6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 2:
- return_value = route['network']
- LOG.info("_get_port1gateway6 : %s", return_value)
- return return_value
+ return self.vnfd_helper.kpi
def instantiate(self, scenario_cfg, context_cfg):
""" Prepare VNF for operation and start the VNF process/VM
@@ -284,11 +157,10 @@ class GenericVNF(object):
class GenericTrafficGen(GenericVNF):
""" Class providing file-like API for generic traffic generator """
- def __init__(self, vnfd):
- super(GenericTrafficGen, self).__init__(vnfd)
+ def __init__(self, name, vnfd):
+ super(GenericTrafficGen, self).__init__(name, vnfd)
self.runs_traffic = True
self.traffic_finished = False
- self.name = "tgen__1" # name in topology file
def run_traffic(self, traffic_profile):
""" Generate traffic on the wire according to the given params.
diff --git a/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py b/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py
new file mode 100644
index 000000000..f9980b165
--- /dev/null
+++ b/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py
@@ -0,0 +1,123 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import time
+import logging
+
+from six.moves import zip
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper
+
+LOG = logging.getLogger(__name__)
+
+# CGNAPT should work the same on all systems, we can provide the binary
+CGNAPT_PIPELINE_COMMAND = 'sudo {tool_path} -p {ports_len_hex} -f {cfg_file} -s {script}'
+WAIT_FOR_STATIC_NAPT = 4
+
+CGNAPT_COLLECT_KPI = """\
+CG-NAPT(.*\n)*\
+Received\s(\d+),\
+Missed\s(\d+),\
+Dropped\s(\d+),\
+Translated\s(\d+),\
+ingress\
+"""
+
+
+class CgnaptApproxSetupEnvHelper(DpdkVnfSetupEnvHelper):
+
+ APP_NAME = "vCGNAPT"
+ CFG_CONFIG = "/tmp/cgnapt_config"
+ CFG_SCRIPT = "/tmp/cgnapt_script"
+ DEFAULT_CONFIG_TPL_CFG = "cgnat.cfg"
+ PIPELINE_COMMAND = CGNAPT_PIPELINE_COMMAND
+ SW_DEFAULT_CORE = 6
+ HW_DEFAULT_CORE = 3
+ VNF_TYPE = "CGNAPT"
+
+ @staticmethod
+ def _generate_ip_from_pool(ip):
+ ip_parts = ip.split('.')
+ assert len(ip_parts) == 4
+ iter1 = (str(n) for n in range(int(ip_parts[2]), 256))
+ for ip_parts[2] in iter1:
+ yield '.'.join(ip_parts)
+
+ @staticmethod
+ def _update_cgnat_script_file(ip_pipeline_cfg, mcpi, vnf_str):
+ pipeline_config_str = str(ip_pipeline_cfg)
+ input_cmds = '\n'.join(mcpi)
+ icmp_flag = 'link 0 down' in input_cmds
+ if icmp_flag:
+ pipeline_config_str = ''
+ return '\n'.join([pipeline_config_str, input_cmds])
+
+ def scale(self, flavor=""):
+ raise NotImplementedError
+
+ def _get_cgnapt_config(self, interfaces=None):
+ if interfaces is None:
+ interfaces = self.vnfd_helper.interfaces
+
+ gateway_ips = []
+
+ # fixme: Get private port and gateway from port list
+ priv_ports = interfaces[::2]
+ for interface in priv_ports:
+ gateway_ips.append(self._get_ports_gateway(interface["name"]))
+ return gateway_ips
+
+
+class CgnaptApproxVnf(SampleVNF):
+
+ APP_NAME = "vCGNAPT"
+ APP_WORD = 'cgnapt'
+ COLLECT_KPI = CGNAPT_COLLECT_KPI
+
+ COLLECT_MAP = {
+ "packets_in": 2,
+ "packets_fwd": 5,
+ "packets_dropped": 4,
+ }
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if setup_env_helper_type is None:
+ setup_env_helper_type = CgnaptApproxSetupEnvHelper
+
+ super(CgnaptApproxVnf, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
+
+ def _vnf_up_post(self):
+ super(CgnaptApproxVnf, self)._vnf_up_post()
+ if self.scenario_helper.options.get('napt', 'static') != 'static':
+ return
+
+ ip_iter = self.setup_helper._generate_ip_from_pool("152.16.40.10")
+ gw_ips = self.setup_helper._get_cgnapt_config()
+ if self.scenario_helper.vnf_cfg.get("lb_config", "SW") == 'HW':
+ pipeline = self.setup_helper.HW_DEFAULT_CORE
+ offset = 3
+ else:
+ pipeline = self.setup_helper.SW_DEFAULT_CORE - 1
+ offset = 0
+
+ worker_threads = int(self.scenario_helper.vnf_cfg["worker_threads"])
+ cmd_template = "p {0} entry addm {1} 1 {2} 1 0 32 65535 65535 65535"
+ for gw, ip in zip(gw_ips, ip_iter):
+ cmd = cmd_template.format(pipeline, gw, ip)
+ pipeline += worker_threads
+ pipeline += offset
+ self.vnf_execute(cmd)
+
+ time.sleep(WAIT_FOR_STATIC_NAPT)
diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
new file mode 100644
index 000000000..90053bc36
--- /dev/null
+++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
@@ -0,0 +1,994 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" Base class implementation for generic vnf implementation """
+
+from __future__ import absolute_import
+
+import posixpath
+import time
+import logging
+import os
+import re
+import subprocess
+from collections import Mapping
+
+from multiprocessing import Queue, Value, Process
+
+from six.moves import cStringIO
+
+from yardstick.benchmark.contexts.base import Context
+from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
+from yardstick.network_services.helpers.cpu import CpuSysCores
+from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
+from yardstick.network_services.nfvi.resource import ResourceProfile
+from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
+from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
+from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
+from yardstick.network_services.utils import get_nsb_option
+
+from stl.trex_stl_lib.trex_stl_client import STLClient
+from stl.trex_stl_lib.trex_stl_client import LoggerApi
+from stl.trex_stl_lib.trex_stl_exceptions import STLError, STLStateError
+
+from yardstick.ssh import AutoConnectSSH
+
+DPDK_VERSION = "dpdk-16.07"
+
+LOG = logging.getLogger(__name__)
+
+
+REMOTE_TMP = "/tmp"
+
+
+class VnfSshHelper(AutoConnectSSH):
+
+ def __init__(self, node, bin_path, wait=None):
+ self.node = node
+ kwargs = self.args_from_node(self.node)
+ if wait:
+ kwargs.setdefault('wait', wait)
+
+ super(VnfSshHelper, self).__init__(**kwargs)
+ self.bin_path = bin_path
+
+ @staticmethod
+ def get_class():
+ # must return static class name, anything else refers to the calling class
+ # i.e. the subclass, not the superclass
+ return VnfSshHelper
+
+ def copy(self):
+ # this copy constructor is different from SSH classes, since it uses node
+ return self.get_class()(self.node, self.bin_path)
+
+ def upload_config_file(self, prefix, content):
+ cfg_file = os.path.join(REMOTE_TMP, prefix)
+ LOG.debug(content)
+ file_obj = cStringIO(content)
+ self.put_file_obj(file_obj, cfg_file)
+ return cfg_file
+
+ def join_bin_path(self, *args):
+ return os.path.join(self.bin_path, *args)
+
+ def provision_tool(self, tool_path=None, tool_file=None):
+ if tool_path is None:
+ tool_path = self.bin_path
+ return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
+
+
+class SetupEnvHelper(object):
+
+ CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
+ CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
+ CORES = []
+ DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
+ PIPELINE_COMMAND = ''
+ VNF_TYPE = "SAMPLE"
+
+ def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
+ super(SetupEnvHelper, self).__init__()
+ self.vnfd_helper = vnfd_helper
+ self.ssh_helper = ssh_helper
+ self.scenario_helper = scenario_helper
+
+ def _get_ports_gateway(self, name):
+ routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
+ for route in routing_table:
+ if name == route['if']:
+ return route['gateway']
+ return None
+
+ def build_config(self):
+ raise NotImplementedError
+
+ def setup_vnf_environment(self):
+ pass
+ # raise NotImplementedError
+
+ def tear_down(self):
+ raise NotImplementedError
+
+
+class DpdkVnfSetupEnvHelper(SetupEnvHelper):
+
+ APP_NAME = 'DpdkVnf'
+ DPDK_BIND_CMD = "sudo {dpdk_nic_bind} {force} -b {driver} {vpci}"
+ DPDK_UNBIND_CMD = "sudo {dpdk_nic_bind} --force -b {driver} {vpci}"
+ FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
+
+ HW_DEFAULT_CORE = 3
+ SW_DEFAULT_CORE = 2
+
+ DPDK_STATUS_DRIVER_RE = re.compile(r"(\d{2}:\d{2}\.\d).*drv=([-\w]+)")
+
+ @staticmethod
+ def _update_packet_type(ip_pipeline_cfg, traffic_options):
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
+ pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
+ return pipeline_config_str
+
+ @classmethod
+ def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
+ traffic_type = traffic_options['traffic_type']
+
+ if traffic_options['vnf_type'] is not cls.APP_NAME:
+ match_str = 'traffic_type = 4'
+ replace_str = 'traffic_type = {0}'.format(traffic_type)
+
+ elif traffic_type == 4:
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = ipv4'
+
+ else:
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = ipv6'
+
+ pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
+ return pipeline_config_str
+
+ def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
+ super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
+ self.all_ports = None
+ self.bound_pci = None
+ self._dpdk_nic_bind = None
+ self.socket = None
+
+ @property
+ def dpdk_nic_bind(self):
+ if self._dpdk_nic_bind is None:
+ self._dpdk_nic_bind = self.ssh_helper.provision_tool(tool_file="dpdk-devbind.py")
+ return self._dpdk_nic_bind
+
+ def _setup_hugepages(self):
+ cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
+ hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
+
+ memory_path = \
+ '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
+ self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
+
+ if hugepages == "2048kB":
+ pages = 16384
+ else:
+ pages = 16
+
+ self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
+
+ def _get_dpdk_port_num(self, name):
+ interface = self.vnfd_helper.find_interface(name=name)
+ return interface['virtual-interface']['dpdk_port_num']
+
+ def build_config(self):
+ vnf_cfg = self.scenario_helper.vnf_cfg
+ task_path = self.scenario_helper.task_path
+
+ lb_count = vnf_cfg.get('lb_count', 3)
+ lb_config = vnf_cfg.get('lb_config', 'SW')
+ worker_config = vnf_cfg.get('worker_config', '1C/1T')
+ worker_threads = vnf_cfg.get('worker_threads', 3)
+
+ traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
+ traffic_options = {
+ 'traffic_type': traffic_type,
+ 'pkt_type': 'ipv%s' % traffic_type,
+ 'vnf_type': self.VNF_TYPE,
+ }
+
+ config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
+ config_basename = posixpath.basename(self.CFG_CONFIG)
+ script_basename = posixpath.basename(self.CFG_SCRIPT)
+ multiport = MultiPortConfig(self.scenario_helper.topology,
+ config_tpl_cfg,
+ config_basename,
+ self.vnfd_helper.interfaces,
+ self.VNF_TYPE,
+ lb_count,
+ worker_threads,
+ worker_config,
+ lb_config,
+ self.socket)
+
+ multiport.generate_config()
+ with open(self.CFG_CONFIG) as handle:
+ new_config = handle.read()
+
+ new_config = self._update_traffic_type(new_config, traffic_options)
+ new_config = self._update_packet_type(new_config, traffic_options)
+
+ self.ssh_helper.upload_config_file(config_basename, new_config)
+ self.ssh_helper.upload_config_file(script_basename,
+ multiport.generate_script(self.vnfd_helper))
+ self.all_ports = multiport.port_pair_list
+
+ LOG.info("Provision and start the %s", self.APP_NAME)
+ self._build_pipeline_kwargs()
+ return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
+
+ def _build_pipeline_kwargs(self):
+ tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
+ ports_len_hex = hex(2 ** (len(self.all_ports) + 1) - 1)
+ self.pipeline_kwargs = {
+ 'cfg_file': self.CFG_CONFIG,
+ 'script': self.CFG_SCRIPT,
+ 'ports_len_hex': ports_len_hex,
+ 'tool_path': tool_path,
+ }
+
+ def _get_app_cpu(self):
+ if self.CORES:
+ return self.CORES
+
+ vnf_cfg = self.scenario_helper.vnf_cfg
+ sys_obj = CpuSysCores(self.ssh_helper)
+ self.sys_cpu = sys_obj.get_core_socket()
+ num_core = int(vnf_cfg["worker_threads"])
+ if vnf_cfg.get("lb_config", "SW") == 'HW':
+ num_core += self.HW_DEFAULT_CORE
+ else:
+ num_core += self.SW_DEFAULT_CORE
+ app_cpu = self.sys_cpu[str(self.socket)][:num_core]
+ return app_cpu
+
+ def _get_cpu_sibling_list(self, cores=None):
+ if cores is None:
+ cores = self._get_app_cpu()
+ sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
+ awk_template = "awk -F: '{ print $1 }' < %s"
+ sys_path = "/sys/devices/system/cpu/"
+ cpu_topology = []
+ try:
+ for core in cores:
+ sys_cmd = sys_cmd_template % (sys_path, core)
+ cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
+ cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
+
+ return cpu_topology
+ except Exception:
+ return []
+
+ def _validate_cpu_cfg(self):
+ return self._get_cpu_sibling_list()
+
+ def _find_used_drivers(self):
+ cmd = "{0} -s".format(self.dpdk_nic_bind)
+ rc, dpdk_status, _ = self.ssh_helper.execute(cmd)
+
+ self.used_drivers = {
+ vpci: (index, driver)
+ for index, (vpci, driver)
+ in enumerate(self.DPDK_STATUS_DRIVER_RE.findall(dpdk_status))
+ if any(b.endswith(vpci) for b in self.bound_pci)
+ }
+
+ def setup_vnf_environment(self):
+ self._setup_dpdk()
+ resource = self._setup_resources()
+ self._kill_vnf()
+ self._detect_drivers()
+ return resource
+
+ def _kill_vnf(self):
+ self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
+
+ def _setup_dpdk(self):
+ """ setup dpdk environment needed for vnf to run """
+
+ self._setup_hugepages()
+ self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
+
+ exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
+ if exit_status == 0:
+ return
+
+ dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
+ dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
+ exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
+ if exit_status != 0:
+ self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
+
+ def _setup_resources(self):
+ interfaces = self.vnfd_helper.interfaces
+ self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
+
+ # what is this magic? how do we know which socket is for which port?
+ # what about quad-socket?
+ if any(v[5] == "0" for v in self.bound_pci):
+ self.socket = 0
+ else:
+ self.socket = 1
+
+ cores = self._validate_cpu_cfg()
+ return ResourceProfile(self.vnfd_helper.mgmt_interface,
+ interfaces=self.vnfd_helper.interfaces, cores=cores)
+
+ def _detect_drivers(self):
+ interfaces = self.vnfd_helper.interfaces
+
+ self._find_used_drivers()
+ for vpci, (index, _) in self.used_drivers.items():
+ try:
+ intf1 = next(v for v in interfaces if vpci == v['virtual-interface']['vpci'])
+ except StopIteration:
+ pass
+ else:
+ intf1['dpdk_port_num'] = index
+
+ for vpci in self.bound_pci:
+ self._bind_dpdk('igb_uio', vpci)
+ time.sleep(2)
+
+ def _bind_dpdk(self, driver, vpci, force=True):
+ if force:
+ force = '--force '
+ else:
+ force = ''
+ cmd = self.DPDK_BIND_CMD.format(force=force,
+ dpdk_nic_bind=self.dpdk_nic_bind,
+ driver=driver,
+ vpci=vpci)
+ self.ssh_helper.execute(cmd)
+
+ def _detect_and_bind_dpdk(self, vpci, driver):
+ find_net_cmd = self.FIND_NET_CMD.format(vpci)
+ exit_status, _, _ = self.ssh_helper.execute(find_net_cmd)
+ if exit_status == 0:
+ # already bound
+ return None
+ self._bind_dpdk(driver, vpci)
+ exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
+ if exit_status != 0:
+ # failed to bind
+ return None
+ return stdout
+
+ def _bind_kernel_devices(self):
+ for intf in self.vnfd_helper.interfaces:
+ vi = intf["virtual-interface"]
+ stdout = self._detect_and_bind_dpdk(vi["vpci"], vi["driver"])
+ if stdout is not None:
+ vi["local_iface_name"] = posixpath.basename(stdout)
+
+ def tear_down(self):
+ for vpci, (_, driver) in self.used_drivers.items():
+ self.ssh_helper.execute(self.DPDK_UNBIND_CMD.format(dpdk_nic_bind=self.dpdk_nic_bind,
+ driver=driver,
+ vpci=vpci))
+
+
+class ResourceHelper(object):
+
+ COLLECT_KPI = ''
+ MAKE_INSTALL = 'cd {0} && make && sudo make install'
+ RESOURCE_WORD = 'sample'
+
+ COLLECT_MAP = {}
+
+ def __init__(self, setup_helper):
+ super(ResourceHelper, self).__init__()
+ self.resource = None
+ self.setup_helper = setup_helper
+ self.ssh_helper = setup_helper.ssh_helper
+
+ def setup(self):
+ self.resource = self.setup_helper.setup_vnf_environment()
+
+ def generate_cfg(self):
+ pass
+
+ def _collect_resource_kpi(self):
+ result = {}
+ status = self.resource.check_if_sa_running("collectd")[0]
+ if status:
+ result = self.resource.amqp_collect_nfvi_kpi()
+
+ result = {"core": result}
+ return result
+
+ def start_collect(self):
+ self.resource.initiate_systemagent(self.ssh_helper.bin_path)
+ self.resource.start()
+ self.resource.amqp_process_for_nfvi_kpi()
+
+ def stop_collect(self):
+ if self.resource:
+ self.resource.stop()
+
+ def collect_kpi(self):
+ return self._collect_resource_kpi()
+
+
+class ClientResourceHelper(ResourceHelper):
+
+ RUN_DURATION = 60
+ QUEUE_WAIT_TIME = 5
+ SYNC_PORT = 1
+ ASYNC_PORT = 2
+
+ def __init__(self, setup_helper):
+ super(ClientResourceHelper, self).__init__(setup_helper)
+ self.vnfd_helper = setup_helper.vnfd_helper
+ self.scenario_helper = setup_helper.scenario_helper
+
+ self.client = None
+ self.client_started = Value('i', 0)
+ self.my_ports = None
+ self._queue = Queue()
+ self._result = {}
+ self._terminated = Value('i', 0)
+ self._vpci_ascending = None
+
+ def _build_ports(self):
+ self.my_ports = [0, 1]
+
+ def get_stats(self, *args, **kwargs):
+ try:
+ return self.client.get_stats(*args, **kwargs)
+ except STLStateError:
+ LOG.exception("TRex client not connected")
+ return {}
+
+ def generate_samples(self, key=None, default=None):
+ last_result = self.get_stats(self.my_ports)
+ key_value = last_result.get(key, default)
+
+ if not isinstance(last_result, Mapping): # added for mock unit test
+ self._terminated.value = 1
+ return {}
+
+ samples = {}
+ for vpci_idx, vpci in enumerate(self._vpci_ascending):
+ name = self.vnfd_helper.find_virtual_interface(vpci=vpci)["name"]
+ # fixme: VNFDs KPIs values needs to be mapped to TRex structure
+ xe_value = last_result.get(vpci_idx, {})
+ samples[name] = {
+ "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
+ "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
+ "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
+ "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
+ "in_packets": int(xe_value.get("ipackets", 0)),
+ "out_packets": int(xe_value.get("opackets", 0)),
+ }
+ if key:
+ samples[name][key] = key_value
+ return samples
+
+ def _run_traffic_once(self, traffic_profile):
+ traffic_profile.execute(self)
+ self.client_started.value = 1
+ time.sleep(self.RUN_DURATION)
+ samples = self.generate_samples()
+ time.sleep(self.QUEUE_WAIT_TIME)
+ self._queue.put(samples)
+
+ def run_traffic(self, traffic_profile):
+ # fixme: fix passing correct trex config file,
+ # instead of searching the default path
+ self._build_ports()
+ self.client = self._connect()
+ self.client.reset(ports=self.my_ports)
+ self.client.remove_all_streams(self.my_ports) # remove all streams
+ traffic_profile.register_generator(self)
+
+ while self._terminated.value == 0:
+ self._run_traffic_once(traffic_profile)
+
+ self.client.stop(self.my_ports)
+ self.client.disconnect()
+ self._terminated.value = 0
+
+ def terminate(self):
+ self._terminated.value = 1 # stop client
+
+ def clear_stats(self, ports=None):
+ if ports is None:
+ ports = self.my_ports
+ self.client.clear_stats(ports=ports)
+
+ def start(self, ports=None, *args, **kwargs):
+ if ports is None:
+ ports = self.my_ports
+ self.client.start(ports=ports, *args, **kwargs)
+
+ def collect_kpi(self):
+ if not self._queue.empty():
+ kpi = self._queue.get()
+ self._result.update(kpi)
+ LOG.debug("Collect {0} KPIs {1}".format(self.RESOURCE_WORD, self._result))
+ return self._result
+
+ def _connect(self, client=None):
+ if client is None:
+ client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
+ server=self.vnfd_helper.mgmt_interface["ip"],
+ verbose_level=LoggerApi.VERBOSE_QUIET)
+
+ # try to connect with 5s intervals, 30s max
+ for idx in range(6):
+ try:
+ client.connect()
+ break
+ except STLError:
+ LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
+ time.sleep(5)
+ return client
+
+
+class Rfc2544ResourceHelper(object):
+
+ DEFAULT_CORRELATED_TRAFFIC = False
+ DEFAULT_LATENCY = False
+ DEFAULT_TOLERANCE = '0.0001 - 0.0001'
+
+ def __init__(self, scenario_helper):
+ super(Rfc2544ResourceHelper, self).__init__()
+ self.scenario_helper = scenario_helper
+ self._correlated_traffic = None
+ self.iteration = Value('i', 0)
+ self._latency = None
+ self._rfc2544 = None
+ self._tolerance_low = None
+ self._tolerance_high = None
+
+ @property
+ def rfc2544(self):
+ if self._rfc2544 is None:
+ self._rfc2544 = self.scenario_helper.all_options['rfc2544']
+ return self._rfc2544
+
+ @property
+ def tolerance_low(self):
+ if self._tolerance_low is None:
+ self.get_rfc_tolerance()
+ return self._tolerance_low
+
+ @property
+ def tolerance_high(self):
+ if self._tolerance_high is None:
+ self.get_rfc_tolerance()
+ return self._tolerance_high
+
+ @property
+ def correlated_traffic(self):
+ if self._correlated_traffic is None:
+ self._correlated_traffic = \
+ self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
+
+ return self._correlated_traffic
+
+ @property
+ def latency(self):
+ if self._latency is None:
+ self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
+ return self._latency
+
+ def get_rfc2544(self, name, default=None):
+ return self.rfc2544.get(name, default)
+
+ def get_rfc_tolerance(self):
+ tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
+ tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
+ self._tolerance_low = next(tolerance_iter)
+ self._tolerance_high = next(tolerance_iter, self.tolerance_low)
+
+
+class SampleVNFDeployHelper(object):
+
+ SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
+ REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
+ SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
+
+ def __init__(self, vnfd_helper, ssh_helper):
+ super(SampleVNFDeployHelper, self).__init__()
+ self.ssh_helper = ssh_helper
+ self.vnfd_helper = vnfd_helper
+
+ DISABLE_DEPLOY = True
+
+ def deploy_vnfs(self, app_name):
+ # temp disable for now
+ if self.DISABLE_DEPLOY:
+ return
+
+ vnf_bin = self.ssh_helper.join_bin_path(app_name)
+ exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
+ if not exit_status:
+ return
+
+ subprocess.check_output(["rm", "-rf", self.REPO_NAME])
+ subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
+ time.sleep(2)
+ self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
+ self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
+
+ build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
+ time.sleep(2)
+ http_proxy = os.environ.get('http_proxy', '')
+ cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
+ LOG.debug(cmd)
+ self.ssh_helper.execute(cmd)
+ vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
+ self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
+ self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
+
+
+class ScenarioHelper(object):
+
+ DEFAULT_VNF_CFG = {
+ 'lb_config': 'SW',
+ 'lb_count': 1,
+ 'worker_config': '1C/1T',
+ 'worker_threads': 1,
+ }
+
+ def __init__(self, name):
+ self.name = name
+ self.scenario_cfg = None
+
+ @property
+ def task_path(self):
+ return self.scenario_cfg["task_path"]
+
+ @property
+ def nodes(self):
+ return self.scenario_cfg['nodes']
+
+ @property
+ def all_options(self):
+ return self.scenario_cfg["options"]
+
+ @property
+ def options(self):
+ return self.all_options[self.name]
+
+ @property
+ def vnf_cfg(self):
+ return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
+
+ @property
+ def topology(self):
+ return self.scenario_cfg['topology']
+
+
+class SampleVNF(GenericVNF):
+ """ Class providing file-like API for generic VNF implementation """
+
+ VNF_PROMPT = "pipeline>"
+ WAIT_TIME = 1
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ super(SampleVNF, self).__init__(name, vnfd)
+ self.bin_path = get_nsb_option('bin_path', '')
+
+ self.scenario_helper = ScenarioHelper(self.name)
+ self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
+
+ if setup_env_helper_type is None:
+ setup_env_helper_type = SetupEnvHelper
+
+ self.setup_helper = setup_env_helper_type(self.vnfd_helper,
+ self.ssh_helper,
+ self.scenario_helper)
+
+ self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
+
+ if resource_helper_type is None:
+ resource_helper_type = ResourceHelper
+
+ self.resource_helper = resource_helper_type(self.setup_helper)
+
+ self.all_ports = None
+ self.context_cfg = None
+ self.nfvi_context = None
+ self.pipeline_kwargs = {}
+ self.priv_ports = None
+ self.pub_ports = None
+ # TODO(esm): make QueueFileWrapper invert-able so that we
+ # never have to manage the queues
+ self.q_in = Queue()
+ self.q_out = Queue()
+ self.queue_wrapper = None
+ self.run_kwargs = {}
+ self.scenario_cfg = None
+ self.tg_port_pairs = None
+ self.used_drivers = {}
+ self.vnf_port_pairs = None
+ self._vnf_process = None
+
+ def _get_route_data(self, route_index, route_type):
+ route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
+ for _ in range(route_index):
+ next(route_iter, '')
+ return next(route_iter, {}).get(route_type, '')
+
+ def _get_port0localip6(self):
+ return_value = self._get_route_data(0, 'network')
+ LOG.info("_get_port0localip6 : %s", return_value)
+ return return_value
+
+ def _get_port1localip6(self):
+ return_value = self._get_route_data(1, 'network')
+ LOG.info("_get_port1localip6 : %s", return_value)
+ return return_value
+
+ def _get_port0prefixlen6(self):
+ return_value = self._get_route_data(0, 'netmask')
+ LOG.info("_get_port0prefixlen6 : %s", return_value)
+ return return_value
+
+ def _get_port1prefixlen6(self):
+ return_value = self._get_route_data(1, 'netmask')
+ LOG.info("_get_port1prefixlen6 : %s", return_value)
+ return return_value
+
+ def _get_port0gateway6(self):
+ return_value = self._get_route_data(0, 'network')
+ LOG.info("_get_port0gateway6 : %s", return_value)
+ return return_value
+
+ def _get_port1gateway6(self):
+ return_value = self._get_route_data(1, 'network')
+ LOG.info("_get_port1gateway6 : %s", return_value)
+ return return_value
+
+ def _start_vnf(self):
+ self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
+ self._vnf_process = Process(target=self._run)
+ self._vnf_process.start()
+
+ def _vnf_up_post(self):
+ pass
+
+ def instantiate(self, scenario_cfg, context_cfg):
+ self.scenario_helper.scenario_cfg = scenario_cfg
+ self.context_cfg = context_cfg
+ self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
+ # self.nfvi_context = None
+
+ self.deploy_helper.deploy_vnfs(self.APP_NAME)
+ self.resource_helper.setup()
+ self._start_vnf()
+
+ def wait_for_instantiate(self):
+ buf = []
+ time.sleep(self.WAIT_TIME) # Give some time for config to load
+ while True:
+ if not self._vnf_process.is_alive():
+ raise RuntimeError("%s VNF process died." % self.APP_NAME)
+
+ # TODO(esm): move to QueueFileWrapper
+ while self.q_out.qsize() > 0:
+ buf.append(self.q_out.get())
+ message = ''.join(buf)
+ if self.VNF_PROMPT in message:
+ LOG.info("%s VNF is up and running.", self.APP_NAME)
+ self._vnf_up_post()
+ self.queue_wrapper.clear()
+ self.resource_helper.start_collect()
+ return self._vnf_process.exitcode
+
+ if "PANIC" in message:
+ raise RuntimeError("Error starting %s VNF." %
+ self.APP_NAME)
+
+ LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
+ time.sleep(1)
+
+ def _build_run_kwargs(self):
+ self.run_kwargs = {
+ 'stdin': self.queue_wrapper,
+ 'stdout': self.queue_wrapper,
+ 'keep_stdin_open': True,
+ 'pty': True,
+ }
+
+ def _build_config(self):
+ return self.setup_helper.build_config()
+
+ def _run(self):
+ # we can't share ssh paramiko objects to force new connection
+ self.ssh_helper.drop_connection()
+ cmd = self._build_config()
+ # kill before starting
+ self.ssh_helper.execute("pkill {}".format(self.APP_NAME))
+
+ LOG.debug(cmd)
+ self._build_run_kwargs()
+ self.ssh_helper.run(cmd, **self.run_kwargs)
+
+ def vnf_execute(self, cmd, wait_time=2):
+ """ send cmd to vnf process """
+
+ LOG.info("%s command: %s", self.APP_NAME, cmd)
+ self.q_in.put("{}\r\n".format(cmd))
+ time.sleep(wait_time)
+ output = []
+ while self.q_out.qsize() > 0:
+ output.append(self.q_out.get())
+ return "".join(output)
+
+ def _tear_down(self):
+ pass
+
+ def terminate(self):
+ self.vnf_execute("quit")
+ if self._vnf_process:
+ self._vnf_process.terminate()
+ self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
+ self._tear_down()
+ self.resource_helper.stop_collect()
+
+ def get_stats(self, *args, **kwargs):
+ """
+ Method for checking the statistics
+
+ :return:
+ VNF statistics
+ """
+ cmd = 'p {0} stats'.format(self.APP_WORD)
+ out = self.vnf_execute(cmd)
+ return out
+
+ def collect_kpi(self):
+ stats = self.get_stats()
+ m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
+ if m:
+ result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
+ result["collect_stats"] = self.resource_helper.collect_kpi()
+ else:
+ result = {
+ "packets_in": 0,
+ "packets_fwd": 0,
+ "packets_dropped": 0,
+ }
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
+ return result
+
+
+class SampleVNFTrafficGen(GenericTrafficGen):
+ """ Class providing file-like API for generic traffic generator """
+
+ APP_NAME = 'Sample'
+ RUN_WAIT = 1
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ super(SampleVNFTrafficGen, self).__init__(name, vnfd)
+ self.bin_path = get_nsb_option('bin_path', '')
+ self.name = "tgen__1" # name in topology file
+
+ self.scenario_helper = ScenarioHelper(self.name)
+ self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
+
+ if setup_env_helper_type is None:
+ setup_env_helper_type = SetupEnvHelper
+
+ self.setup_helper = setup_env_helper_type(self.vnfd_helper,
+ self.ssh_helper,
+ self.scenario_helper)
+
+ if resource_helper_type is None:
+ resource_helper_type = ClientResourceHelper
+
+ self.resource_helper = resource_helper_type(self.setup_helper)
+
+ self.runs_traffic = True
+ self.traffic_finished = False
+ self.tg_port_pairs = None
+ self._tg_process = None
+ self._traffic_process = None
+
+ def _start_server(self):
+ # we can't share ssh paramiko objects to force new connection
+ self.ssh_helper.drop_connection()
+
+ def instantiate(self, scenario_cfg, context_cfg):
+ self.scenario_helper.scenario_cfg = scenario_cfg
+ self.resource_helper.generate_cfg()
+ self.setup_helper.setup_vnf_environment()
+ self.resource_helper.setup()
+
+ LOG.info("Starting %s server...", self.APP_NAME)
+ self._tg_process = Process(target=self._start_server)
+ self._tg_process.start()
+
+ def wait_for_instantiate(self):
+ return self._wait_for_process()
+
+ def _check_status(self):
+ raise NotImplementedError
+
+ def _wait_for_process(self):
+ while True:
+ if not self._tg_process.is_alive():
+ raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
+ LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
+ time.sleep(1)
+ status = self._check_status()
+ if status == 0:
+ LOG.info("%s TG Server is up and running.", self.APP_NAME)
+ return self._tg_process.exitcode
+
+ def _traffic_runner(self, traffic_profile):
+ LOG.info("Starting %s client...", self.APP_NAME)
+ self.resource_helper.run_traffic(traffic_profile)
+
+ def run_traffic(self, traffic_profile):
+ """ Generate traffic on the wire according to the given params.
+ Method is non-blocking, returns immediately when traffic process
+ is running. Mandatory.
+
+ :param traffic_profile:
+ :return: True/False
+ """
+ self._traffic_process = Process(target=self._traffic_runner,
+ args=(traffic_profile,))
+ self._traffic_process.start()
+ # Wait for traffic process to start
+ while self.resource_helper.client_started.value == 0:
+ time.sleep(self.RUN_WAIT)
+
+ return self._traffic_process.is_alive()
+
+ def listen_traffic(self, traffic_profile):
+ """ Listen to traffic with the given parameters.
+ Method is non-blocking, returns immediately when traffic process
+ is running. Optional.
+
+ :param traffic_profile:
+ :return: True/False
+ """
+ pass
+
+ def verify_traffic(self, traffic_profile):
+ """ Verify captured traffic after it has ended. Optional.
+
+ :param traffic_profile:
+ :return: dict
+ """
+ pass
+
+ def collect_kpi(self):
+ result = self.resource_helper.collect_kpi()
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
+ return result
+
+ def terminate(self):
+ """ After this method finishes, all traffic processes should stop. Mandatory.
+
+ :return: True/False
+ """
+ self.traffic_finished = True
+ if self._traffic_process is not None:
+ self._traffic_process.terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ixload.py b/yardstick/network_services/vnf_generic/vnf/tg_ixload.py
new file mode 100644
index 000000000..c15f7b954
--- /dev/null
+++ b/yardstick/network_services/vnf_generic/vnf/tg_ixload.py
@@ -0,0 +1,176 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import csv
+import glob
+import logging
+import os
+import shutil
+
+from collections import OrderedDict
+from subprocess import call
+
+import six
+
+from yardstick.common.utils import makedirs
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
+
+LOG = logging.getLogger(__name__)
+
+VNF_PATH = os.path.dirname(os.path.realpath(__file__))
+
+MOUNT_CMD = """\
+mount.cifs //{0[ip]}/Results {1.RESULTS_MOUNT} \
+-o username={0[user]},password={0[password]}\
+"""
+
+IXLOAD_CONFIG_TEMPLATE = '''\
+{
+ "ixia_chassis": "%s",
+ "IXIA": {
+ "ports": %s,
+ "card": %s
+ },
+ "remote_server": "%s",
+ "result_dir": "%s",
+ "ixload_cfg": '"C:/Results/%s"
+}'''
+
+IXLOAD_CMD = "{ixloadpy} {http_ixload} {args}"
+
+
+class ResourceDataHelper(list):
+
+ def get_aggregates(self):
+ return {
+ "min": min(self),
+ "max": max(self),
+ "avg": sum(self) / len(self),
+ }
+
+
+class IxLoadResourceHelper(ClientResourceHelper):
+
+ RESULTS_MOUNT = "/mnt/Results"
+
+ KPI_LIST = OrderedDict((
+ ('http_throughput', 'HTTP Total Throughput (Kbps)'),
+ ('simulated_users', 'HTTP Simulated Users'),
+ ('concurrent_connections', 'HTTP Concurrent Connections'),
+ ('connection_rate', 'HTTP Connection Rate'),
+ ('transaction_rate', 'HTTP Transaction Rate'),
+ ))
+
+ def __init__(self, setup_helper):
+ super(IxLoadResourceHelper, self).__init__(setup_helper)
+ self.result = OrderedDict((key, ResourceDataHelper()) for key in self.KPI_LIST)
+ self.resource_file_name = ''
+
+ def parse_csv_read(self, reader):
+ for row in reader:
+ try:
+ new_data = {key_left: int(row[key_right])
+ for key_left, key_right in self.KPI_LIST.items()}
+ except (TypeError, ValueError):
+ continue
+ else:
+ for key, value in new_data.items():
+ self.result[key].append(value)
+
+ def setup(self):
+ # TODO: fixupt scenario_helper to hanlde ixia
+ self.resource_file_name = str(self.scenario_helper.scenario_cfg['ixia_profile'])
+ makedirs(self.RESULTS_MOUNT)
+ cmd = MOUNT_CMD.format(self.vnfd_helper.mgmt_interface, self)
+ LOG.debug(cmd)
+
+ if not os.path.ismount(self.RESULTS_MOUNT):
+ call(cmd, shell=True)
+
+ shutil.rmtree(self.RESULTS_MOUNT, ignore_errors=True)
+ makedirs(self.RESULTS_MOUNT)
+ shutil.copy(self.resource_file_name, self.RESULTS_MOUNT)
+
+ def make_aggregates(self):
+ return {key_right: self.result[key_left].get_aggregates()
+ for key_left, key_right in self.KPI_LIST.items()}
+
+ def log(self):
+ for key in self.KPI_LIST:
+ LOG.debug(self.result[key])
+
+
+class IxLoadTrafficGen(SampleVNFTrafficGen):
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if resource_helper_type is None:
+ resource_helper_type = IxLoadResourceHelper
+
+ super(IxLoadTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
+ self._result = {}
+ self.done = False
+ self.data = None
+
+ def run_traffic(self, traffic_profile):
+ ports = []
+ card = None
+ for interface in self.vnfd_helper.interfaces:
+ vpci_list = interface['virtual-interface']["vpci"].split(":")
+ card = vpci_list[0]
+ ports.append(vpci_list[1])
+
+ for csv_file in glob.iglob(self.ssh_helper.join_bin_path('*.csv')):
+ os.unlink(csv_file)
+
+ ixia_config = self.vnfd_helper.mgmt_interface["tg-config"]
+ ixload_config = IXLOAD_CONFIG_TEMPLATE % (
+ ixia_config["ixchassis"], ports, card,
+ self.vnfd_helper.mgmt_interface["ip"], self.ssh_helper.bin_path,
+ os.path.basename(self.resource_helper.resource_file_name))
+
+ http_ixload_path = os.path.join(VNF_PATH, "../../traffic_profile")
+ cmd = IXLOAD_CMD.format(
+ ixloadpy=os.path.join(ixia_config["py_bin_path"], "ixloadpython"),
+ http_ixload=os.path.join(http_ixload_path, "http_ixload.py"),
+ args="'%s'" % ixload_config)
+
+ LOG.debug(cmd)
+ call(cmd, shell=True)
+
+ with open(self.ssh_helper.join_bin_path("ixLoad_HTTP_Client.csv")) as csv_file:
+ lines = csv_file.readlines()[10:]
+
+ with open(self.ssh_helper.join_bin_path("http_result.csv"), 'wb+') as result_file:
+ result_file.writelines(six.text_type(lines[:-1]))
+ result_file.flush()
+ result_file.seek(0)
+ reader = csv.DictReader(result_file)
+ self.resource_helper.parse_csv_read(reader)
+
+ self.resource_helper.log()
+ self.data = self.resource_helper.make_aggregates()
+
+ def listen_traffic(self, traffic_profile):
+ pass
+
+ def instantiate(self, scenario_cfg, context_cfg):
+ super(IxLoadTrafficGen, self).instantiate(scenario_cfg, context_cfg)
+ self.done = False
+
+ def terminate(self):
+ call(["pkill", "-9", "http_ixload.py"])
+ super(IxLoadTrafficGen, self).terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ping.py b/yardstick/network_services/vnf_generic/vnf/tg_ping.py
index 000a91db4..e65296287 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_ping.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_ping.py
@@ -16,14 +16,13 @@
from __future__ import absolute_import
from __future__ import print_function
import logging
-import multiprocessing
import re
-import time
-import os
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
-from yardstick.network_services.utils import provision_tool
+from multiprocessing import Queue
+from ipaddress import IPv4Interface
+
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
LOG = logging.getLogger(__name__)
@@ -42,77 +41,59 @@ class PingParser(object):
if match:
# IMPORTANT: in order for the data to be properly taken
# in by InfluxDB, it needs to be converted to numeric types
- self.queue.put({"packets_received": float(match.group(1)),
- "rtt": float(match.group(2))})
+ self.queue.put({
+ "packets_received": float(match.group(1)),
+ "rtt": float(match.group(2)),
+ })
def close(self):
- ''' close the ssh connection '''
- pass
+ """ close the ssh connection """
+ self.closed = True
def clear(self):
- ''' clear queue till Empty '''
+ """ clear queue till Empty """
while self.queue.qsize() > 0:
self.queue.get()
-class PingTrafficGen(GenericTrafficGen):
+class PingSetupEnvHelper(DpdkVnfSetupEnvHelper):
+
+ def setup_vnf_environment(self):
+ self._bind_kernel_devices()
+
+
+class PingTrafficGen(SampleVNFTrafficGen):
"""
This traffic generator can ping a single IP with pingsize
and target given in traffic profile
"""
- def __init__(self, vnfd):
- super(PingTrafficGen, self).__init__(vnfd)
+ TG_NAME = 'Ping'
+ RUN_WAIT = 4
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if setup_env_helper_type is None:
+ setup_env_helper_type = PingSetupEnvHelper
+
+ super(PingTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
+ self._queue = Queue()
+ self._parser = PingParser(self._queue)
self._result = {}
- self._parser = None
- self._queue = None
- self._traffic_process = None
-
- mgmt_interface = vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
-
- def _bind_device_kernel(self, connection):
- dpdk_nic_bind = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "dpdk_nic_bind.py"))
-
- drivers = {intf["virtual-interface"]["vpci"]:
- intf["virtual-interface"]["driver"]
- for intf in self.vnfd["vdu"][0]["external-interface"]}
-
- commands = \
- ['"{0}" --force -b "{1}" "{2}"'.format(dpdk_nic_bind, value, key)
- for key, value in drivers.items()]
- for command in commands:
- connection.execute(command)
-
- for index, out in enumerate(self.vnfd["vdu"][0]["external-interface"]):
- vpci = out["virtual-interface"]["vpci"]
- net = "find /sys/class/net -lname '*{}*' -printf '%f'".format(vpci)
- out = connection.execute(net)[1]
- ifname = out.split('/')[-1].strip('\n')
- self.vnfd["vdu"][0]["external-interface"][index][
- "virtual-interface"]["local_iface_name"] = ifname
def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(PingTrafficGen, self).scale(flavor)
+ """ scale vnf-based on flavor input """
+ pass
- def instantiate(self, scenario_cfg, context_cfg):
- self._result = {"packets_received": 0, "rtt": 0}
- self._bind_device_kernel(self.connection)
+ def _check_status(self):
+ return self._tg_process.is_alive()
- def run_traffic(self, traffic_profile):
- self._queue = multiprocessing.Queue()
- self._parser = PingParser(self._queue)
- self._traffic_process = \
- multiprocessing.Process(target=self._traffic_runner,
- args=(traffic_profile, self._parser))
- self._traffic_process.start()
- # Wait for traffic process to start
- time.sleep(4)
- return self._traffic_process.is_alive()
+ def instantiate(self, scenario_cfg, context_cfg):
+ self._result = {
+ "packets_received": 0,
+ "rtt": 0,
+ }
+ self.setup_helper.setup_vnf_environment()
def listen_traffic(self, traffic_profile):
""" Not needed for ping
@@ -122,38 +103,26 @@ class PingTrafficGen(GenericTrafficGen):
"""
pass
- def _traffic_runner(self, traffic_profile, filewrapper):
-
- mgmt_interface = self.vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
- external_interface = self.vnfd["vdu"][0]["external-interface"]
- virtual_interface = external_interface[0]["virtual-interface"]
- target_ip = virtual_interface["dst_ip"].split('/')[0]
- local_ip = virtual_interface["local_ip"].split('/')[0]
- local_if_name = \
- virtual_interface["local_iface_name"].split('/')[0]
- packet_size = traffic_profile.params["traffic_profile"]["frame_size"]
-
- run_cmd = []
-
- run_cmd.append("ip addr flush %s" % local_if_name)
- run_cmd.append("ip addr add %s/24 dev %s" % (local_ip, local_if_name))
- run_cmd.append("ip link set %s up" % local_if_name)
-
- for cmd in run_cmd:
- self.connection.execute(cmd)
-
- ping_cmd = ("ping -s %s %s" % (packet_size, target_ip))
- self.connection.run(ping_cmd, stdout=filewrapper,
+ def _traffic_runner(self, traffic_profile):
+ intf = self.vnfd_helper.interfaces[0]["virtual-interface"]
+ profile = traffic_profile.params["traffic_profile"]
+ cmd_kwargs = {
+ 'target_ip': IPv4Interface(intf["dst_ip"]).ip.exploded,
+ 'local_ip': IPv4Interface(intf["local_ip"]).ip.exploded,
+ 'local_if_name': intf["local_iface_name"].split('/')[0],
+ 'packet_size': profile["frame_size"],
+ }
+
+ cmd_list = [
+ "sudo ip addr flush {local_if_name}",
+ "sudo ip addr add {local_ip}/24 dev {local_if_name}",
+ "sudo ip link set {local_if_name} up",
+ ]
+
+ for cmd in cmd_list:
+ self.ssh_helper.execute(cmd.format(**cmd_kwargs))
+
+ ping_cmd = "ping -s {packet_size} {target_ip}"
+ self.ssh_helper.run(ping_cmd.format(**cmd_kwargs),
+ stdout=self._parser,
keep_stdin_open=True, pty=True)
-
- def collect_kpi(self):
- if not self._queue.empty():
- kpi = self._queue.get()
- self._result.update(kpi)
- return self._result
-
- def terminate(self):
- if self._traffic_process is not None:
- self._traffic_process.terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py
new file mode 100644
index 000000000..07bbdae95
--- /dev/null
+++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py
@@ -0,0 +1,165 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import time
+import os
+import logging
+import sys
+
+from yardstick.common.utils import ErrorClass
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper
+
+try:
+ from IxNet import IxNextgen
+except ImportError:
+ IxNextgen = ErrorClass
+
+LOG = logging.getLogger(__name__)
+
+WAIT_AFTER_CFG_LOAD = 10
+WAIT_FOR_TRAFFIC = 30
+IXIA_LIB = os.path.dirname(os.path.realpath(__file__))
+IXNET_LIB = os.path.join(IXIA_LIB, "../../libs/ixia_libs/IxNet")
+sys.path.append(IXNET_LIB)
+
+
+class IxiaRfc2544Helper(Rfc2544ResourceHelper):
+
+ pass
+
+
+class IxiaResourceHelper(ClientResourceHelper):
+
+ def __init__(self, setup_helper, rfc_helper_type=None):
+ super(IxiaResourceHelper, self).__init__(setup_helper)
+ self.scenario_helper = setup_helper.scenario_helper
+
+ self.client = IxNextgen()
+
+ if rfc_helper_type is None:
+ rfc_helper_type = IxiaRfc2544Helper
+
+ self.rfc_helper = rfc_helper_type(self.scenario_helper)
+ self.tg_port_pairs = []
+ self.priv_ports = None
+ self.pub_ports = None
+
+ def _connect(self, client=None):
+ self.client.connect(self.vnfd_helper)
+
+ def _build_ports(self):
+ # self.generate_port_pairs(self.topology)
+ self.priv_ports = [int(x[0][-1]) for x in self.tg_port_pairs]
+ self.pub_ports = [int(x[1][-1]) for x in self.tg_port_pairs]
+ self.my_ports = list(set(self.priv_ports).union(set(self.pub_ports)))
+
+ def get_stats(self, *args, **kwargs):
+ return self.client.ix_get_statistics()[1]
+
+ def stop_collect(self):
+ self._terminated.value = 0
+ if self.client:
+ self.client.ix_stop_traffic()
+
+ def generate_samples(self, key=None, default=None):
+ last_result = self.get_stats()
+
+ samples = {}
+ for vpci_idx, interface in enumerate(self.vnfd_helper.interfaces):
+ name = "xe{0}".format(vpci_idx)
+ samples[name] = {
+ "rx_throughput_kps": float(last_result["Rx_Rate_Kbps"][vpci_idx]),
+ "tx_throughput_kps": float(last_result["Tx_Rate_Kbps"][vpci_idx]),
+ "rx_throughput_mbps": float(last_result["Rx_Rate_Mbps"][vpci_idx]),
+ "tx_throughput_mbps": float(last_result["Tx_Rate_Mbps"][vpci_idx]),
+ "in_packets": int(last_result["Valid_Frames_Rx"][vpci_idx]),
+ "out_packets": int(last_result["Frames_Tx"][vpci_idx]),
+ "RxThroughput": int(last_result["Valid_Frames_Rx"][vpci_idx]) / 30,
+ "TxThroughput": int(last_result["Frames_Tx"][vpci_idx]) / 30,
+ }
+
+ return samples
+
+ def run_traffic(self, traffic_profile):
+ min_tol = self.rfc_helper.tolerance_low
+ max_tol = self.rfc_helper.tolerance_high
+
+ self._build_ports()
+ self._connect()
+
+ # we don't know client_file_name until runtime as instantiate
+ client_file_name = self.scenario_helper.scenario_cfg['ixia_profile']
+ self.client.ix_load_config(client_file_name)
+ time.sleep(WAIT_AFTER_CFG_LOAD)
+
+ self.client.ix_assign_ports()
+
+ mac = {}
+ for index, interface in enumerate(self.vnfd_helper.interfaces):
+ virt_intf = interface["virtual-interface"]
+ mac.update({
+ "src_mac_{}".format(index): virt_intf["local_mac"],
+ "dst_mac_{}".format(index): virt_intf["dst_mac"],
+ })
+
+ samples = {}
+ ixia_file = os.path.join(os.getcwd(), "ixia_traffic.cfg")
+ # Generate ixia traffic config...
+ while not self._terminated.value:
+ traffic_profile.execute(self, self.client, mac, ixia_file)
+ self.client_started.value = 1
+ time.sleep(WAIT_FOR_TRAFFIC)
+ self.client.ix_stop_traffic()
+ samples = self.generate_samples()
+ self._queue.put(samples)
+ status, samples = traffic_profile.get_drop_percentage(self, samples, min_tol,
+ max_tol, self.client, mac,
+ ixia_file)
+
+ current = samples['CurrentDropPercentage']
+ if min_tol <= current <= max_tol or status == 'Completed':
+ self._terminated.value = 1
+
+ self.client.ix_stop_traffic()
+ self._queue.put(samples)
+
+
+class IxiaTrafficGen(SampleVNFTrafficGen):
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if resource_helper_type is None:
+ resource_helper_type = IxiaResourceHelper
+
+ super(IxiaTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
+ self._ixia_traffic_gen = None
+ self.ixia_file_name = ''
+ self.tg_port_pairs = []
+ self.vnf_port_pairs = []
+
+ def _check_status(self):
+ pass
+
+ def scale(self, flavor=""):
+ pass
+
+ def listen_traffic(self, traffic_profile):
+ pass
+
+ def terminate(self):
+ self.resource_helper.stop_collect()
+ super(IxiaTrafficGen, self).terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py
index 7da4b31e9..79e42e0a8 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py
@@ -15,267 +15,98 @@
from __future__ import absolute_import
from __future__ import print_function
-import multiprocessing
import time
import logging
-import os
-import yaml
+from collections import Mapping
+from itertools import chain
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
-from yardstick.network_services.utils import get_nsb_option
-from stl.trex_stl_lib.trex_stl_client import STLClient
-from stl.trex_stl_lib.trex_stl_client import LoggerApi
-from stl.trex_stl_lib.trex_stl_exceptions import STLError
+from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
+from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper
+from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexResourceHelper
LOGGING = logging.getLogger(__name__)
-DURATION = 30
-WAIT_TIME = 3
-TREX_SYNC_PORT = 4500
-TREX_ASYNC_PORT = 4501
+class TrexRfc2544ResourceHelper(Rfc2544ResourceHelper):
-class TrexTrafficGenRFC(GenericTrafficGen):
- """
- This class handles mapping traffic profile and generating
- traffic for rfc2544 testcase.
- """
-
- def __init__(self, vnfd):
- super(TrexTrafficGenRFC, self).__init__(vnfd)
- self._result = {}
- self._terminated = multiprocessing.Value('i', 0)
- self._queue = multiprocessing.Queue()
- self._terminated = multiprocessing.Value('i', 0)
- self._traffic_process = None
- self._vpci_ascending = None
- self.tc_file_name = None
- self.client = None
- self.my_ports = None
-
- mgmt_interface = self.vnfd["mgmt-interface"]
-
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
-
- @classmethod
- def _split_mac_address_into_list(cls, mac):
- octets = mac.split(':')
- for i, elem in enumerate(octets):
- octets[i] = "0x" + str(elem)
- return octets
-
- def _generate_trex_cfg(self, vnfd):
- """
-
- :param vnfd: vnfd.yaml
- :return: trex_cfg.yaml file
- """
- trex_cfg = dict(
- port_limit=0,
- version='2',
- interfaces=[],
- port_info=list(dict(
- ))
- )
- trex_cfg["port_limit"] = len(vnfd["vdu"][0]["external-interface"])
- trex_cfg["version"] = '2'
-
- cfg_file = []
- vpci = []
- port = {}
-
- ext_intf = vnfd["vdu"][0]["external-interface"]
- for interface in ext_intf:
- virt_intf = interface["virtual-interface"]
- vpci.append(virt_intf["vpci"])
-
- port["src_mac"] = \
- self._split_mac_address_into_list(virt_intf["local_mac"])
-
- time.sleep(WAIT_TIME)
- port["dest_mac"] = \
- self._split_mac_address_into_list(virt_intf["dst_mac"])
- if virt_intf["dst_mac"]:
- trex_cfg["port_info"].append(port.copy())
-
- trex_cfg["interfaces"] = vpci
- cfg_file.append(trex_cfg)
-
- with open('/tmp/trex_cfg.yaml', 'w') as outfile:
- outfile.write(yaml.safe_dump(cfg_file, default_flow_style=False))
- self.connection.put('/tmp/trex_cfg.yaml', '/etc')
-
- self._vpci_ascending = sorted(vpci)
-
- def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(TrexTrafficGenRFC, self).scale(flavor)
-
- def instantiate(self, scenario_cfg, context_cfg):
- self._generate_trex_cfg(self.vnfd)
- self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc'])
- trex = os.path.join(self.bin_path, "trex")
- err, _, _ = \
- self.connection.execute("ls {} >/dev/null 2>&1".format(trex))
- if err != 0:
- self.connection.put(trex, trex, True)
+ def is_done(self):
+ return self.latency and self.iteration.value > 10
- LOGGING.debug("Starting TRex server...")
- _tg_server = \
- multiprocessing.Process(target=self._start_server)
- _tg_server.start()
- while True:
- LOGGING.info("Waiting for TG Server to start.. ")
- time.sleep(WAIT_TIME)
- status = \
- self.connection.execute("lsof -i:%s" % TREX_SYNC_PORT)[0]
- if status == 0:
- LOGGING.info("TG server is up and running.")
- return _tg_server.exitcode
- if not _tg_server.is_alive():
- raise RuntimeError("Traffic Generator process died.")
+class TrexRfcResourceHelper(TrexResourceHelper):
- def listen_traffic(self, traffic_profile):
- pass
+ LATENCY_TIME_SLEEP = 120
+ RUN_DURATION = 30
+ WAIT_TIME = 3
- def _get_logical_if_name(self, vpci):
- ext_intf = self.vnfd["vdu"][0]["external-interface"]
- for interface in range(len(self.vnfd["vdu"][0]["external-interface"])):
- virtual_intf = ext_intf[interface]["virtual-interface"]
- if virtual_intf["vpci"] == vpci:
- return ext_intf[interface]["name"]
+ def __init__(self, setup_helper, rfc_helper_type=None):
+ super(TrexRfcResourceHelper, self).__init__(setup_helper)
- def run_traffic(self, traffic_profile,
- client_started=multiprocessing.Value('i', 0)):
+ if rfc_helper_type is None:
+ rfc_helper_type = TrexRfc2544ResourceHelper
- self._traffic_process = \
- multiprocessing.Process(target=self._traffic_runner,
- args=(traffic_profile, self._queue,
- client_started, self._terminated))
- self._traffic_process.start()
- # Wait for traffic process to start
- while client_started.value == 0:
- time.sleep(1)
+ self.rfc2544_helper = rfc_helper_type(self.scenario_helper)
+ # self.tg_port_pairs = []
- return self._traffic_process.is_alive()
+ def _build_ports(self):
+ self.tg_port_pairs, self.networks = MultiPortConfig.get_port_pairs(
+ self.vnfd_helper.interfaces)
+ self.priv_ports = [int(x[0][-1]) for x in self.tg_port_pairs]
+ self.pub_ports = [int(x[1][-1]) for x in self.tg_port_pairs]
+ self.my_ports = list(set(chain(self.priv_ports, self.pub_ports)))
- def _start_server(self):
- mgmt_interface = self.vnfd["mgmt-interface"]
-
- _server = ssh.SSH.from_node(mgmt_interface)
- _server.wait()
-
- _server.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- (TREX_SYNC_PORT, TREX_ASYNC_PORT))
- _server.execute("pkill -9 rex > /dev/null 2>&1")
-
- trex_path = os.path.join(self.bin_path, "trex/scripts")
- path = get_nsb_option("trex_path", trex_path)
- trex_cmd = "cd " + path + "; sudo ./t-rex-64 -i > /dev/null 2>&1"
-
- _server.execute(trex_cmd)
-
- def _connect_client(self, client=None):
- if client is None:
- client = STLClient(username=self.vnfd["mgmt-interface"]["user"],
- server=self.vnfd["mgmt-interface"]["ip"],
- verbose_level=LoggerApi.VERBOSE_QUIET)
- for idx in range(6):
- try:
- client.connect()
- break
- except STLError:
- LOGGING.info("Unable to connect to Trex. Attempt %s", idx)
- time.sleep(WAIT_TIME)
- return client
-
- @classmethod
- def _get_rfc_tolerance(cls, tc_yaml):
- tolerance = '0.8 - 1.0'
- if 'tc_options' in tc_yaml['scenarios'][0]:
- tc_options = tc_yaml['scenarios'][0]['tc_options']
- if 'rfc2544' in tc_options:
- tolerance = \
- tc_options['rfc2544'].get('allowed_drop_rate', '0.8 - 1.0')
-
- tolerance = tolerance.split('-')
- min_tol = float(tolerance[0])
- if len(tolerance) == 2:
- max_tol = float(tolerance[1])
- else:
- max_tol = float(tolerance[0])
-
- return [min_tol, max_tol]
-
- def _traffic_runner(self, traffic_profile, queue,
- client_started, terminated):
- LOGGING.info("Starting TRex client...")
- tc_yaml = {}
-
- with open(self.tc_file_name) as tc_file:
- tc_yaml = yaml.load(tc_file.read())
+ def _run_traffic_once(self, traffic_profile):
+ traffic_profile.execute(self)
+ self.client_started.value = 1
+ time.sleep(self.RUN_DURATION)
+ self.client.stop(self.my_ports)
+ time.sleep(self.WAIT_TIME)
+ samples = traffic_profile.get_drop_percentage(self)
+ self._queue.put(samples)
- tolerance = self._get_rfc_tolerance(tc_yaml)
+ if not self.rfc2544_helper.is_done():
+ return
- # fixme: fix passing correct trex config file,
- # instead of searching the default path
- self.my_ports = [0, 1]
- self.client = self._connect_client()
+ self.client.stop(self.my_ports)
self.client.reset(ports=self.my_ports)
- self.client.remove_all_streams(self.my_ports) # remove all streams
- while not terminated.value:
- traffic_profile.execute(self)
- client_started.value = 1
- time.sleep(DURATION)
+ self.client.remove_all_streams(self.my_ports)
+ traffic_profile.execute_latency(samples=samples)
+ multiplier = traffic_profile.calculate_pps(samples)[1]
+ for _ in range(5):
+ time.sleep(self.LATENCY_TIME_SLEEP)
self.client.stop(self.my_ports)
- time.sleep(WAIT_TIME)
+ time.sleep(self.WAIT_TIME)
last_res = self.client.get_stats(self.my_ports)
- samples = {}
- for vpci_idx in range(len(self._vpci_ascending)):
- name = \
- self._get_logical_if_name(self._vpci_ascending[vpci_idx])
- # fixme: VNFDs KPIs values needs to be mapped to TRex structure
- if not isinstance(last_res, dict):
- terminated.value = 1
- last_res = {}
+ if not isinstance(last_res, Mapping):
+ self._terminated.value = 1
+ continue
+ self.generate_samples('latency', {})
+ self._queue.put(samples)
+ self.client.start(mult=str(multiplier),
+ ports=self.my_ports,
+ duration=120, force=True)
- samples[name] = \
- {"rx_throughput_fps":
- float(last_res.get(vpci_idx, {}).get("rx_pps", 0.0)),
- "tx_throughput_fps":
- float(last_res.get(vpci_idx, {}).get("tx_pps", 0.0)),
- "rx_throughput_mbps":
- float(last_res.get(vpci_idx, {}).get("rx_bps", 0.0)),
- "tx_throughput_mbps":
- float(last_res.get(vpci_idx, {}).get("tx_bps", 0.0)),
- "in_packets":
- last_res.get(vpci_idx, {}).get("ipackets", 0),
- "out_packets":
- last_res.get(vpci_idx, {}).get("opackets", 0)}
+ def start_client(self, mult, duration, force=True):
+ self.client.start(ports=self.my_ports, mult=mult, duration=duration, force=force)
- samples = \
- traffic_profile.get_drop_percentage(self, samples,
- tolerance[0], tolerance[1])
- queue.put(samples)
- self.client.stop(self.my_ports)
- self.client.disconnect()
- queue.put(samples)
+ def clear_client_stats(self):
+ self.client.clear_stats(ports=self.my_ports)
def collect_kpi(self):
- if not self._queue.empty():
- result = self._queue.get()
- self._result.update(result)
- LOGGING.debug("trex collect Kpis %s", self._result)
- return self._result
+ self.rfc2544_helper.iteration.value += 1
+ super(TrexRfcResourceHelper, self).collect_kpi()
+
- def terminate(self):
- self._terminated.value = 1 # stop Trex clinet
+class TrexTrafficGenRFC(TrexTrafficGen):
+ """
+ This class handles mapping traffic profile and generating
+ traffic for rfc2544 testcase.
+ """
- self.connection.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- (TREX_SYNC_PORT, TREX_ASYNC_PORT))
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if resource_helper_type is None:
+ resource_helper_type = TrexRfcResourceHelper
- if self._traffic_process:
- self._traffic_process.terminate()
+ super(TrexTrafficGenRFC, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_trex.py
index 058b715fe..616b331ba 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_trex.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_trex.py
@@ -15,261 +15,136 @@
from __future__ import absolute_import
from __future__ import print_function
-import multiprocessing
-import time
+
import logging
import os
+
import yaml
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
+from yardstick.common.utils import mac_address_to_hex_list
from yardstick.network_services.utils import get_nsb_option
-from yardstick.network_services.utils import provision_tool
-from stl.trex_stl_lib.trex_stl_client import STLClient
-from stl.trex_stl_lib.trex_stl_client import LoggerApi
-from stl.trex_stl_lib.trex_stl_exceptions import STLError
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
LOG = logging.getLogger(__name__)
-DURATION = 30
-WAIT_QUEUE = 1
-TREX_SYNC_PORT = 4500
-TREX_ASYNC_PORT = 4501
-class TrexTrafficGen(GenericTrafficGen):
+class TrexResourceHelper(ClientResourceHelper):
+
+ CONF_FILE = '/tmp/trex_cfg.yaml'
+ QUEUE_WAIT_TIME = 1
+ RESOURCE_WORD = 'trex'
+ RUN_DURATION = 0
+
+ SYNC_PORT = 4500
+ ASYNC_PORT = 4501
+
+ def generate_cfg(self):
+ ext_intf = self.vnfd_helper.interfaces
+ vpci_list = []
+ port_list = []
+ trex_cfg = {
+ 'port_limit': 0,
+ 'version': '2',
+ 'interfaces': vpci_list,
+ 'port_info': port_list,
+ "port_limit": len(ext_intf),
+ "version": '2',
+ }
+ cfg_file = [trex_cfg]
+
+ for interface in ext_intf:
+ virtual_interface = interface['virtual-interface']
+ vpci_list.append(virtual_interface["vpci"])
+ dst_mac = virtual_interface["dst_mac"]
+
+ if not dst_mac:
+ continue
+
+ local_mac = virtual_interface["local_mac"]
+ port_list.append({
+ "src_mac": mac_address_to_hex_list(local_mac),
+ "dest_mac": mac_address_to_hex_list(dst_mac),
+ })
+
+ cfg_str = yaml.safe_dump(cfg_file, default_flow_style=False, explicit_start=True)
+ self.ssh_helper.upload_config_file(os.path.basename(self.CONF_FILE), cfg_str)
+ self._vpci_ascending = sorted(vpci_list)
+
+ def check_status(self):
+ status, _, _ = self.ssh_helper.execute("sudo lsof -i:%s" % self.SYNC_PORT)
+ return status
+
+ # temp disable
+ DISABLE_DEPLOY = True
+
+ def setup(self):
+ if self.DISABLE_DEPLOY:
+ return
+
+ trex_path = self.ssh_helper.join_bin_path('trex')
+
+ err = self.ssh_helper.execute("which {}".format(trex_path))[0]
+ if err == 0:
+ return
+
+ LOG.info("Copying %s to destination...", self.RESOURCE_WORD)
+ self.ssh_helper.run("sudo mkdir -p '{}'".format(os.path.dirname(trex_path)))
+ self.ssh_helper.put("~/.bash_profile", "~/.bash_profile")
+ self.ssh_helper.put(trex_path, trex_path, True)
+ ko_src = os.path.join(trex_path, "scripts/ko/src/")
+ self.ssh_helper.execute(self.MAKE_INSTALL.format(ko_src))
+
+ def start(self, ports=None, *args, **kwargs):
+ cmd = "sudo fuser -n tcp {0.SYNC_PORT} {0.ASYNC_PORT} -k > /dev/null 2>&1"
+ self.ssh_helper.execute(cmd.format(self))
+
+ self.ssh_helper.execute("sudo pkill -9 rex > /dev/null 2>&1")
+
+ trex_path = self.ssh_helper.join_bin_path("trex", "scripts")
+ path = get_nsb_option("trex_path", trex_path)
+
+ # cmd = "sudo ./t-rex-64 -i --cfg %s > /dev/null 2>&1" % self.CONF_FILE
+ cmd = "./t-rex-64 -i --cfg '{}'".format(self.CONF_FILE)
+
+ # if there are errors we want to see them
+ # we have to sudo cd because the path might be owned by root
+ trex_cmd = """sudo bash -c "cd '{}' ; {}" >/dev/null""".format(path, cmd)
+ self.ssh_helper.execute(trex_cmd)
+
+ def terminate(self):
+ super(TrexResourceHelper, self).terminate()
+ cmd = "sudo fuser -n tcp %s %s -k > /dev/null 2>&1"
+ self.ssh_helper.execute(cmd % (self.SYNC_PORT, self.ASYNC_PORT))
+
+
+class TrexTrafficGen(SampleVNFTrafficGen):
"""
This class handles mapping traffic profile and generating
traffic for given testcase
"""
- def __init__(self, vnfd):
- super(TrexTrafficGen, self).__init__(vnfd)
- self._result = {}
- self._queue = multiprocessing.Queue()
- self._terminated = multiprocessing.Value('i', 0)
- self._traffic_process = None
- self._vpci_ascending = None
- self.client = None
- self.my_ports = None
- self.client_started = multiprocessing.Value('i', 0)
-
- mgmt_interface = vnfd["mgmt-interface"]
-
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
-
- @classmethod
- def _split_mac_address_into_list(cls, mac):
- octets = mac.split(':')
- for i, elem in enumerate(octets):
- octets[i] = "0x" + str(elem)
- return octets
-
- def _generate_trex_cfg(self, vnfd):
- """
-
- :param vnfd: vnfd.yaml
- :return: trex_cfg.yaml file
- """
- trex_cfg = dict(
- port_limit=0,
- version='2',
- interfaces=[],
- port_info=list(dict(
- ))
- )
- trex_cfg["port_limit"] = len(vnfd["vdu"][0]["external-interface"])
- trex_cfg["version"] = '2'
-
- cfg_file = []
- vpci = []
- port = {}
-
- for interface in range(len(vnfd["vdu"][0]["external-interface"])):
- ext_intrf = vnfd["vdu"][0]["external-interface"]
- virtual_interface = ext_intrf[interface]["virtual-interface"]
- vpci.append(virtual_interface["vpci"])
-
- port["src_mac"] = self._split_mac_address_into_list(
- virtual_interface["local_mac"])
- port["dest_mac"] = self._split_mac_address_into_list(
- virtual_interface["dst_mac"])
-
- trex_cfg["port_info"].append(port.copy())
-
- trex_cfg["interfaces"] = vpci
- cfg_file.append(trex_cfg)
-
- with open('/tmp/trex_cfg.yaml', 'w') as outfile:
- outfile.write(yaml.safe_dump(cfg_file, default_flow_style=False))
- self.connection.put('/tmp/trex_cfg.yaml', '/etc')
-
- self._vpci_ascending = sorted(vpci)
-
- @classmethod
- def __setup_hugepages(cls, connection):
- hugepages = \
- connection.execute(
- "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1]
- hugepages = hugepages.rstrip()
-
- memory_path = \
- '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
- connection.execute("awk -F: '{ print $1 }' < %s" % memory_path)
-
- pages = 16384 if hugepages.rstrip() == "2048kB" else 16
- connection.execute("echo %s > %s" % (pages, memory_path))
-
- def setup_vnf_environment(self, connection):
- ''' setup dpdk environment needed for vnf to run '''
-
- self.__setup_hugepages(connection)
- connection.execute("modprobe uio && modprobe igb_uio")
-
- exit_status = connection.execute("lsmod | grep -i igb_uio")[0]
- if exit_status == 0:
- return
+ APP_NAME = 'TRex'
- dpdk = os.path.join(self.bin_path, "dpdk-16.07")
- dpdk_setup = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "nsb_setup.sh"))
- status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0]
- if status:
- connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if resource_helper_type is None:
+ resource_helper_type = TrexResourceHelper
- def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(TrexTrafficGen, self).scale(flavor)
-
- def instantiate(self, scenario_cfg, context_cfg):
- self._generate_trex_cfg(self.vnfd)
- self.setup_vnf_environment(self.connection)
-
- trex = os.path.join(self.bin_path, "trex")
- err = \
- self.connection.execute("ls {} >/dev/null 2>&1".format(trex))[0]
- if err != 0:
- LOG.info("Copying trex to destination...")
- self.connection.put("/root/.bash_profile", "/root/.bash_profile")
- self.connection.put(trex, trex, True)
- ko_src = os.path.join(trex, "scripts/ko/src/")
- self.connection.execute("cd %s && make && make install" % ko_src)
-
- LOG.info("Starting TRex server...")
- _tg_process = \
- multiprocessing.Process(target=self._start_server)
- _tg_process.start()
- while True:
- if not _tg_process.is_alive():
- raise RuntimeError("Traffic Generator process died.")
- LOG.info("Waiting for TG Server to start.. ")
- time.sleep(1)
- status = \
- self.connection.execute("lsof -i:%s" % TREX_SYNC_PORT)[0]
- if status == 0:
- LOG.info("TG server is up and running.")
- return _tg_process.exitcode
-
- def listen_traffic(self, traffic_profile):
- pass
+ super(TrexTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
- def _get_logical_if_name(self, vpci):
- ext_intf = self.vnfd["vdu"][0]["external-interface"]
- for interface in range(len(self.vnfd["vdu"][0]["external-interface"])):
- virtual_intf = ext_intf[interface]["virtual-interface"]
- if virtual_intf["vpci"] == vpci:
- return ext_intf[interface]["name"]
-
- def run_traffic(self, traffic_profile):
- self._traffic_process = \
- multiprocessing.Process(target=self._traffic_runner,
- args=(traffic_profile, self._queue,
- self.client_started,
- self._terminated))
- self._traffic_process.start()
- # Wait for traffic process to start
- while self.client_started.value == 0:
- time.sleep(1)
-
- return self._traffic_process.is_alive()
+ def _check_status(self):
+ return self.resource_helper.check_status()
def _start_server(self):
- mgmt_interface = self.vnfd["mgmt-interface"]
-
- _server = ssh.SSH.from_node(mgmt_interface)
- _server.wait()
+ super(TrexTrafficGen, self)._start_server()
+ self.resource_helper.start()
- _server.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- (TREX_SYNC_PORT, TREX_ASYNC_PORT))
+ def scale(self, flavor=""):
+ pass
- trex_path = os.path.join(self.bin_path, "trex/scripts")
- path = get_nsb_option("trex_path", trex_path)
- trex_cmd = "cd " + path + "; sudo ./t-rex-64 -i > /dev/null 2>&1"
-
- _server.execute(trex_cmd)
-
- def _connect_client(self, client=None):
- if client is None:
- client = STLClient(username=self.vnfd["mgmt-interface"]["user"],
- server=self.vnfd["mgmt-interface"]["ip"],
- verbose_level=LoggerApi.VERBOSE_QUIET)
- # try to connect with 5s intervals, 30s max
- for idx in range(6):
- try:
- client.connect()
- break
- except STLError:
- LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
- time.sleep(5)
- return client
-
- def _traffic_runner(self, traffic_profile, queue,
- client_started, terminated):
- LOG.info("Starting TRex client...")
-
- self.my_ports = [0, 1]
- self.client = self._connect_client()
- self.client.reset(ports=self.my_ports)
-
- self.client.remove_all_streams(self.my_ports) # remove all streams
-
- while not terminated.value:
- traffic_profile.execute(self)
- client_started.value = 1
- last_res = self.client.get_stats(self.my_ports)
- if not isinstance(last_res, dict): # added for mock unit test
- terminated.value = 1
- last_res = {}
-
- samples = {}
- for vpci_idx in range(len(self._vpci_ascending)):
- name = \
- self._get_logical_if_name(self._vpci_ascending[vpci_idx])
- # fixme: VNFDs KPIs values needs to be mapped to TRex structure
- xe_value = last_res.get(vpci_idx, {})
- samples[name] = \
- {"rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
- "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
- "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
- "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
- "in_packets": xe_value.get("ipackets", 0),
- "out_packets": xe_value.get("opackets", 0)}
- time.sleep(WAIT_QUEUE)
- queue.put(samples)
-
- self.client.disconnect()
- terminated.value = 0
-
- def collect_kpi(self):
- if not self._queue.empty():
- self._result.update(self._queue.get())
- LOG.debug("trex collect Kpis %s", self._result)
- return self._result
+ def listen_traffic(self, traffic_profile):
+ pass
def terminate(self):
- self.connection.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- (TREX_SYNC_PORT, TREX_ASYNC_PORT))
- self.traffic_finished = True
- if self._traffic_process:
- self._traffic_process.terminate()
+ self.resource_helper.terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py b/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py
new file mode 100644
index 000000000..32a08c7bd
--- /dev/null
+++ b/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py
@@ -0,0 +1,67 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import logging
+
+from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper
+from yardstick.network_services.yang_model import YangModel
+
+LOG = logging.getLogger(__name__)
+
+# vFW should work the same on all systems, we can provide the binary
+FW_PIPELINE_COMMAND = """sudo {tool_path} -p {ports_len_hex} -f {cfg_file} -s {script}"""
+
+FW_COLLECT_KPI = (r"""VFW TOTAL:[^p]+pkts_received"?:\s(\d+),[^p]+pkts_fw_forwarded"?:\s(\d+),"""
+ r"""[^p]+pkts_drop_fw"?:\s(\d+),\s""")
+
+
+class FWApproxSetupEnvHelper(DpdkVnfSetupEnvHelper):
+
+ APP_NAME = "vFW"
+ CFG_CONFIG = "/tmp/vfw_config"
+ CFG_SCRIPT = "/tmp/vfw_script"
+ DEFAULT_CONFIG_TPL_CFG = "vfw.cfg"
+ PIPELINE_COMMAND = FW_PIPELINE_COMMAND
+ SW_DEFAULT_CORE = 5
+ HW_DEFAULT_CORE = 2
+ VNF_TYPE = "VFW"
+
+
+class FWApproxVnf(SampleVNF):
+
+ APP_NAME = "vFW"
+ APP_WORD = 'vfw'
+ COLLECT_KPI = FW_COLLECT_KPI
+
+ COLLECT_MAP = {
+ 'packets_in': 1,
+ 'packets_fwd': 2,
+ 'packets_dropped': 3,
+ }
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if setup_env_helper_type is None:
+ setup_env_helper_type = FWApproxSetupEnvHelper
+
+ super(FWApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type)
+ self.vfw_rules = None
+
+ def _start_vnf(self):
+ yang_model_path = find_relative_file(self.scenario_helper.options['rules'],
+ self.scenario_helper.task_path)
+ yang_model = YangModel(yang_model_path)
+ self.vfw_rules = yang_model.get_rules()
+ super(FWApproxVnf, self)._start_vnf()
diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
index e9e80bdfb..310ab67cb 100644
--- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
+++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
@@ -15,313 +15,268 @@
from __future__ import absolute_import
from __future__ import print_function
-import tempfile
-import time
import os
import logging
import re
-from multiprocessing import Queue
-import multiprocessing
-import ipaddress
-import six
+import posixpath
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
-from yardstick.network_services.utils import provision_tool
-from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
-from yardstick.network_services.nfvi.resource import ResourceProfile
+from six.moves import configparser, zip
-LOG = logging.getLogger(__name__)
-VPE_PIPELINE_COMMAND = '{tool_path} -p 0x3 -f {cfg_file} -s {script}'
-CORES = ['0', '1', '2']
-WAIT_TIME = 20
+from yardstick.network_services.pipeline import PipelineRules
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper
+LOG = logging.getLogger(__name__)
-class VpeApproxVnf(GenericVNF):
+VPE_PIPELINE_COMMAND = """sudo {tool_path} -p {ports_len_hex} -f {cfg_file} -s {script}"""
+
+VPE_COLLECT_KPI = """\
+Pkts in:\s(\d+)\r\n\
+\tPkts dropped by Pkts in:\s(\d+)\r\n\
+\tPkts dropped by AH:\s(\d+)\r\n\\
+\tPkts dropped by other:\s(\d+)\
+"""
+
+
+class ConfigCreate(object):
+
+ @staticmethod
+ def vpe_tmq(config, index):
+ tm_q = 'TM{0}'.format(index)
+ config.add_section(tm_q)
+ config.set(tm_q, 'burst_read', '24')
+ config.set(tm_q, 'burst_write', '32')
+ config.set(tm_q, 'cfg', '/tmp/full_tm_profile_10G.cfg')
+ return config
+
+ def __init__(self, priv_ports, pub_ports, socket):
+ super(ConfigCreate, self).__init__()
+ self.sw_q = -1
+ self.sink_q = -1
+ self.n_pipeline = 1
+ self.priv_ports = priv_ports
+ self.pub_ports = pub_ports
+ self.pipeline_per_port = 9
+ self.socket = socket
+
+ def vpe_initialize(self, config):
+ config.add_section('EAL')
+ config.set('EAL', 'log_level', '0')
+
+ config.add_section('PIPELINE0')
+ config.set('PIPELINE0', 'type', 'MASTER')
+ config.set('PIPELINE0', 'core', 's%sC0' % self.socket)
+
+ config.add_section('MEMPOOL0')
+ config.set('MEMPOOL0', 'pool_size', '256K')
+
+ config.add_section('MEMPOOL1')
+ config.set('MEMPOOL1', 'pool_size', '2M')
+ return config
+
+ def vpe_rxq(self, config):
+ for port in self.pub_ports:
+ new_section = 'RXQ{0}.0'.format(port)
+ config.add_section(new_section)
+ config.set(new_section, 'mempool', 'MEMPOOL1')
+
+ return config
+
+ def get_sink_swq(self, parser, pipeline, k, index):
+ sink = ""
+ pktq = parser.get(pipeline, k)
+ if "SINK" in pktq:
+ self.sink_q += 1
+ sink = " SINK{0}".format(self.sink_q)
+ if "TM" in pktq:
+ sink = " TM{0}".format(index)
+ pktq = "SWQ{0}{1}".format(self.sw_q, sink)
+ return pktq
+
+ def vpe_upstream(self, vnf_cfg, intf):
+ parser = configparser.ConfigParser()
+ parser.read(os.path.join(vnf_cfg, 'vpe_upstream'))
+ for pipeline in parser.sections():
+ for k, v in parser.items(pipeline):
+ if k == "pktq_in":
+ index = intf['index']
+ if "RXQ" in v:
+ value = "RXQ{0}.0".format(index)
+ else:
+ value = self.get_sink_swq(parser, pipeline, k, index)
+
+ parser.set(pipeline, k, value)
+
+ elif k == "pktq_out":
+ index = intf['peer_intf']['index']
+ if "TXQ" in v:
+ value = "TXQ{0}.0".format(index)
+ else:
+ self.sw_q += 1
+ value = self.get_sink_swq(parser, pipeline, k, index)
+
+ parser.set(pipeline, k, value)
+
+ new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline)
+ if new_pipeline != pipeline:
+ parser._sections[new_pipeline] = parser._sections[pipeline]
+ parser._sections.pop(pipeline)
+ self.n_pipeline += 1
+ return parser
+
+ def vpe_downstream(self, vnf_cfg, intf):
+ parser = configparser.ConfigParser()
+ parser.read(os.path.join(vnf_cfg, 'vpe_downstream'))
+ for pipeline in parser.sections():
+ for k, v in parser.items(pipeline):
+ index = intf['dpdk_port_num']
+ peer_index = intf['peer_intf']['dpdk_port_num']
+
+ if k == "pktq_in":
+ if "RXQ" not in v:
+ value = self.get_sink_swq(parser, pipeline, k, index)
+ elif "TM" in v:
+ value = "RXQ{0}.0 TM{1}".format(peer_index, index)
+ else:
+ value = "RXQ{0}.0".format(peer_index)
+
+ parser.set(pipeline, k, value)
+
+ if k == "pktq_out":
+ if "TXQ" not in v:
+ self.sw_q += 1
+ value = self.get_sink_swq(parser, pipeline, k, index)
+ elif "TM" in v:
+ value = "TXQ{0}.0 TM{1}".format(peer_index, index)
+ else:
+ value = "TXQ{0}.0".format(peer_index)
+
+ parser.set(pipeline, k, value)
+
+ new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline)
+ if new_pipeline != pipeline:
+ parser._sections[new_pipeline] = parser._sections[pipeline]
+ parser._sections.pop(pipeline)
+ self.n_pipeline += 1
+ return parser
+
+ def create_vpe_config(self, vnf_cfg):
+ config = configparser.ConfigParser()
+ vpe_cfg = os.path.join("/tmp/vpe_config")
+ with open(vpe_cfg, 'w') as cfg_file:
+ config = self.vpe_initialize(config)
+ config = self.vpe_rxq(config)
+ config.write(cfg_file)
+ for index, priv_port in enumerate(self.priv_ports):
+ config = self.vpe_upstream(vnf_cfg, priv_port)
+ config.write(cfg_file)
+ config = self.vpe_downstream(vnf_cfg, priv_port)
+ config = self.vpe_tmq(config, index)
+ config.write(cfg_file)
+
+ def generate_vpe_script(self, interfaces):
+ rules = PipelineRules(pipeline_id=1)
+ for priv_port, pub_port in zip(self.priv_ports, self.pub_ports):
+ priv_intf = interfaces[priv_port]["virtual-interface"]
+ pub_intf = interfaces[pub_port]["virtual-interface"]
+
+ dst_port0_ip = priv_intf["dst_ip"]
+ dst_port1_ip = pub_intf["dst_ip"]
+ dst_port0_mac = priv_intf["dst_mac"]
+ dst_port1_mac = pub_intf["dst_mac"]
+
+ rules.add_firewall_script(dst_port0_ip)
+ rules.next_pipeline()
+ rules.add_flow_classification_script()
+ rules.next_pipeline()
+ rules.add_flow_action()
+ rules.next_pipeline()
+ rules.add_flow_action2()
+ rules.next_pipeline()
+ rules.add_route_script(dst_port1_ip, dst_port1_mac)
+ rules.next_pipeline()
+ rules.add_route_script2(dst_port0_ip, dst_port0_mac)
+ rules.next_pipeline(num=4)
+
+ return rules.get_string()
+
+
+class VpeApproxSetupEnvHelper(DpdkVnfSetupEnvHelper):
+
+ CFG_CONFIG = "/tmp/vpe_config"
+ CFG_SCRIPT = "/tmp/vpe_script"
+ CORES = ['0', '1', '2', '3', '4', '5']
+ PIPELINE_COMMAND = VPE_PIPELINE_COMMAND
+
+ def build_config(self):
+ vpe_vars = {
+ "bin_path": self.ssh_helper.bin_path,
+ "socket": self.socket,
+ }
+
+ all_ports = []
+ priv_ports = []
+ pub_ports = []
+ for interface in self.vnfd_helper.interfaces:
+ all_ports.append(interface['name'])
+ vld_id = interface['virtual-interface']['vld_id']
+ if vld_id.startswith('private'):
+ priv_ports.append(interface)
+ elif vld_id.startswith('public'):
+ pub_ports.append(interface)
+
+ vpe_conf = ConfigCreate(priv_ports, pub_ports, self.socket)
+ vpe_conf.create_vpe_config(self.scenario_helper.vnf_cfg)
+
+ config_basename = posixpath.basename(self.CFG_CONFIG)
+ script_basename = posixpath.basename(self.CFG_SCRIPT)
+ with open(self.CFG_CONFIG) as handle:
+ vpe_config = handle.read()
+
+ self.ssh_helper.upload_config_file(config_basename, vpe_config.format(**vpe_vars))
+
+ vpe_script = vpe_conf.generate_vpe_script(self.vnfd_helper.interfaces)
+ self.ssh_helper.upload_config_file(script_basename, vpe_script.format(**vpe_vars))
+
+
+class VpeApproxVnf(SampleVNF):
""" This class handles vPE VNF model-driver definitions """
- def __init__(self, vnfd):
- super(VpeApproxVnf, self).__init__(vnfd)
- self.socket = None
- self.q_in = Queue()
- self.q_out = Queue()
- self.vnf_cfg = None
- self._vnf_process = None
- self.connection = None
- self.resource = None
-
- def _resource_collect_start(self):
- self.resource.initiate_systemagent(self.bin_path)
- self.resource.start()
+ APP_NAME = 'vPE_vnf'
+ APP_WORD = 'vpe'
+ COLLECT_KPI = VPE_COLLECT_KPI
+ WAIT_TIME = 20
- def _resource_collect_stop(self):
- self.resource.stop()
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if setup_env_helper_type is None:
+ setup_env_helper_type = VpeApproxSetupEnvHelper
- def _collect_resource_kpi(self):
- result = {}
+ super(VpeApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type)
- status = self.resource.check_if_sa_running("collectd")[0]
- if status:
- result = self.resource.amqp_collect_nfvi_kpi()
-
- result = {"core": result}
-
- return result
-
- @classmethod
- def __setup_hugepages(cls, connection):
- hugepages = \
- connection.execute(
- "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1]
- hugepages = hugepages.rstrip()
-
- memory_path = \
- '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
- connection.execute("awk -F: '{ print $1 }' < %s" % memory_path)
-
- pages = 16384 if hugepages.rstrip() == "2048kB" else 16
- connection.execute("echo %s > %s" % (pages, memory_path))
-
- def setup_vnf_environment(self, connection):
- ''' setup dpdk environment needed for vnf to run '''
-
- self.__setup_hugepages(connection)
- connection.execute("modprobe uio && modprobe igb_uio")
-
- exit_status = connection.execute("lsmod | grep -i igb_uio")[0]
- if exit_status == 0:
- return
-
- dpdk = os.path.join(self.bin_path, "dpdk-16.07")
- dpdk_setup = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "nsb_setup.sh"))
- status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0]
- if status:
- connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
-
- def _get_cpu_sibling_list(self):
- cpu_topo = []
- for core in CORES:
- sys_cmd = \
- "/sys/devices/system/cpu/cpu%s/topology/thread_siblings_list" \
- % core
- cpuid = \
- self.connection.execute("awk -F: '{ print $1 }' < %s" %
- sys_cmd)[1]
- cpu_topo += \
- [(idx) if idx.isdigit() else idx for idx in cpuid.split(',')]
-
- return [cpu.strip() for cpu in cpu_topo]
-
- def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(VpeApproxVnf, self).scale(flavor)
-
- def instantiate(self, scenario_cfg, context_cfg):
- vnf_cfg = scenario_cfg['vnf_options']['vpe']['cfg']
-
- mgmt_interface = self.vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
-
- self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc'])
-
- self.setup_vnf_environment(self.connection)
-
- cores = self._get_cpu_sibling_list()
- self.resource = ResourceProfile(self.vnfd, cores)
-
- self.connection.execute("pkill vPE_vnf")
- dpdk_nic_bind = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "dpdk_nic_bind.py"))
-
- interfaces = self.vnfd["vdu"][0]['external-interface']
- self.socket = \
- next((0 for v in interfaces
- if v['virtual-interface']["vpci"][5] == "0"), 1)
-
- bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
- for vpci in bound_pci:
- self.connection.execute(
- "%s --force -b igb_uio %s" % (dpdk_nic_bind, vpci))
- queue_wrapper = \
- QueueFileWrapper(self.q_in, self.q_out, "pipeline>")
- self._vnf_process = multiprocessing.Process(target=self._run_vpe,
- args=(queue_wrapper,
- vnf_cfg,))
- self._vnf_process.start()
- buf = []
- time.sleep(WAIT_TIME) # Give some time for config to load
- while True:
- message = ''
- while self.q_out.qsize() > 0:
- buf.append(self.q_out.get())
- message = ''.join(buf)
- if "pipeline>" in message:
- LOG.info("VPE VNF is up and running.")
- queue_wrapper.clear()
- self._resource_collect_start()
- return self._vnf_process.exitcode
- if "PANIC" in message:
- raise RuntimeError("Error starting vPE VNF.")
-
- LOG.info("Waiting for VNF to start.. ")
- time.sleep(3)
- if not self._vnf_process.is_alive():
- raise RuntimeError("vPE VNF process died.")
-
- def _get_ports_gateway(self, name):
- if 'routing_table' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['routing_table']
-
- for route in routing_table:
- if name == route['if']:
- return route['gateway']
-
- def terminate(self):
- self.execute_command("quit")
- if self._vnf_process:
- self._vnf_process.terminate()
-
- def _run_vpe(self, filewrapper, vnf_cfg):
- mgmt_interface = self.vnfd["mgmt-interface"]
-
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
-
- interfaces = self.vnfd["vdu"][0]['external-interface']
- port0_ip = ipaddress.ip_interface(six.text_type(
- "%s/%s" % (interfaces[0]["virtual-interface"]["local_ip"],
- interfaces[0]["virtual-interface"]["netmask"])))
- port1_ip = ipaddress.ip_interface(six.text_type(
- "%s/%s" % (interfaces[1]["virtual-interface"]["local_ip"],
- interfaces[1]["virtual-interface"]["netmask"])))
- dst_port0_ip = ipaddress.ip_interface(
- u"%s/%s" % (interfaces[0]["virtual-interface"]["dst_ip"],
- interfaces[0]["virtual-interface"]["netmask"]))
- dst_port1_ip = ipaddress.ip_interface(
- u"%s/%s" % (interfaces[1]["virtual-interface"]["dst_ip"],
- interfaces[1]["virtual-interface"]["netmask"]))
-
- vpe_vars = {"port0_local_ip": port0_ip.ip.exploded,
- "port0_dst_ip": dst_port0_ip.ip.exploded,
- "port0_local_ip_hex":
- self._ip_to_hex(port0_ip.ip.exploded),
- "port0_prefixlen": port0_ip.network.prefixlen,
- "port0_netmask": port0_ip.network.netmask.exploded,
- "port0_netmask_hex":
- self._ip_to_hex(port0_ip.network.netmask.exploded),
- "port0_local_mac":
- interfaces[0]["virtual-interface"]["local_mac"],
- "port0_dst_mac":
- interfaces[0]["virtual-interface"]["dst_mac"],
- "port0_gateway":
- self._get_ports_gateway(interfaces[0]["name"]),
- "port0_local_network":
- port0_ip.network.network_address.exploded,
- "port0_prefix": port0_ip.network.prefixlen,
- "port1_local_ip": port1_ip.ip.exploded,
- "port1_dst_ip": dst_port1_ip.ip.exploded,
- "port1_local_ip_hex":
- self._ip_to_hex(port1_ip.ip.exploded),
- "port1_prefixlen": port1_ip.network.prefixlen,
- "port1_netmask": port1_ip.network.netmask.exploded,
- "port1_netmask_hex":
- self._ip_to_hex(port1_ip.network.netmask.exploded),
- "port1_local_mac":
- interfaces[1]["virtual-interface"]["local_mac"],
- "port1_dst_mac":
- interfaces[1]["virtual-interface"]["dst_mac"],
- "port1_gateway":
- self._get_ports_gateway(interfaces[1]["name"]),
- "port1_local_network":
- port1_ip.network.network_address.exploded,
- "port1_prefix": port1_ip.network.prefixlen,
- "port0_local_ip6": self._get_port0localip6(),
- "port1_local_ip6": self._get_port1localip6(),
- "port0_prefixlen6": self._get_port0prefixlen6(),
- "port1_prefixlen6": self._get_port1prefixlen6(),
- "port0_gateway6": self._get_port0gateway6(),
- "port1_gateway6": self._get_port1gateway6(),
- "port0_dst_ip_hex6": self._get_port0localip6(),
- "port1_dst_ip_hex6": self._get_port1localip6(),
- "port0_dst_netmask_hex6": self._get_port0prefixlen6(),
- "port1_dst_netmask_hex6": self._get_port1prefixlen6(),
- "bin_path": self.bin_path,
- "socket": self.socket}
-
- for cfg in os.listdir(vnf_cfg):
- vpe_config = ""
- with open(os.path.join(vnf_cfg, cfg), 'r') as vpe_cfg:
- vpe_config = vpe_cfg.read()
-
- self._provide_config_file(cfg, vpe_config, vpe_vars)
-
- LOG.info("Provision and start the vPE")
- tool_path = provision_tool(self.connection,
- os.path.join(self.bin_path, "vPE_vnf"))
- cmd = VPE_PIPELINE_COMMAND.format(cfg_file="/tmp/vpe_config",
- script="/tmp/vpe_script",
- tool_path=tool_path)
- self.connection.run(cmd, stdin=filewrapper, stdout=filewrapper,
- keep_stdin_open=True, pty=True)
-
- def _provide_config_file(self, prefix, template, args):
- cfg, cfg_content = tempfile.mkstemp()
- cfg = os.fdopen(cfg, "w+")
- cfg.write(template.format(**args))
- cfg.close()
- cfg_file = "/tmp/%s" % prefix
- self.connection.put(cfg_content, cfg_file)
- return cfg_file
-
- def execute_command(self, cmd):
- ''' send cmd to vnf process '''
- LOG.info("VPE command: %s", cmd)
- output = []
- if self.q_in:
- self.q_in.put(cmd + "\r\n")
- time.sleep(3)
- while self.q_out.qsize() > 0:
- output.append(self.q_out.get())
- return "".join(output)
+ def get_stats(self, *args, **kwargs):
+ raise NotImplementedError
def collect_kpi(self):
- result = self.get_stats_vpe()
- collect_stats = self._collect_resource_kpi()
- result["collect_stats"] = collect_stats
- LOG.debug("vPE collet Kpis: %s", result)
- return result
-
- def get_stats_vpe(self):
- ''' get vpe statistics '''
- result = {'pkt_in_up_stream': 0, 'pkt_drop_up_stream': 0,
- 'pkt_in_down_stream': 0, 'pkt_drop_down_stream': 0}
- up_stat_commands = ['p 5 stats port in 0', 'p 5 stats port out 0',
- 'p 5 stats port out 1']
- down_stat_commands = ['p 9 stats port in 0', 'p 9 stats port out 0']
- pattern = \
- "Pkts in:\\s(\\d+)\\r\\n\\tPkts dropped by " \
- "AH:\\s(\\d+)\\r\\n\\tPkts dropped by other:\\s(\\d+)"
-
- for cmd in up_stat_commands:
- stats = self.execute_command(cmd)
- match = re.search(pattern, stats, re.MULTILINE)
- if match:
- result["pkt_in_up_stream"] = \
- result.get("pkt_in_up_stream", 0) + int(match.group(1))
- result["pkt_drop_up_stream"] = \
- result.get("pkt_drop_up_stream", 0) + \
- int(match.group(2)) + int(match.group(3))
-
- for cmd in down_stat_commands:
- stats = self.execute_command(cmd)
- match = re.search(pattern, stats, re.MULTILINE)
- if match:
- result["pkt_in_down_stream"] = \
- result.get("pkt_in_down_stream", 0) + int(match.group(1))
- result["pkt_drop_down_stream"] = \
- result.get("pkt_drop_down_stream", 0) + \
- int(match.group(2)) + int(match.group(3))
+ result = {
+ 'pkt_in_up_stream': 0,
+ 'pkt_drop_up_stream': 0,
+ 'pkt_in_down_stream': 0,
+ 'pkt_drop_down_stream': 0,
+ 'collect_stats': self.resource_helper.collect_kpi(),
+ }
+
+ indexes_in = [1]
+ indexes_drop = [2, 3]
+ command = 'p {0} stats port {1} 0'
+ for index, direction in ((5, 'up'), (9, 'down')):
+ key_in = "pkt_in_{0}_stream".format(direction)
+ key_drop = "pkt_drop_{0}_stream".format(direction)
+ for mode in ('in', 'out'):
+ stats = self.vnf_execute(command.format(index, mode))
+ match = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
+ if not match:
+ continue
+ result[key_in] += sum(int(match.group(x)) for x in indexes_in)
+ result[key_drop] += sum(int(match.group(x)) for x in indexes_drop)
+
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
return result
diff --git a/yardstick/network_services/vnf_generic/vnfdgen.py b/yardstick/network_services/vnf_generic/vnfdgen.py
index b56a91915..0120b493e 100644
--- a/yardstick/network_services/vnf_generic/vnfdgen.py
+++ b/yardstick/network_services/vnf_generic/vnfdgen.py
@@ -14,11 +14,16 @@
""" Generic file to map and build vnf discriptor """
from __future__ import absolute_import
-import collections
+from functools import reduce
import jinja2
+import logging
import yaml
+from yardstick.common.utils import try_int
+
+LOG = logging.getLogger(__name__)
+
def render(vnf_model, **kwargs):
"""Render jinja2 VNF template
@@ -40,7 +45,8 @@ def generate_vnfd(vnf_model, node):
as input for GenericVNF.__init__
"""
# get is unused as global method inside template
- node["get"] = get
+ # node["get"] = key_flatten_get
+ node["get"] = deepgetitem
# Set Node details to default if not defined in pod file
# we CANNOT use TaskTemplate.render because it does not allow
# for missing variables, we need to allow password for key_filename
@@ -52,36 +58,34 @@ def generate_vnfd(vnf_model, node):
return filled_vnfd
-def dict_key_flatten(data):
- """ Convert nested dict structure to dotted key
- (e.g. {"a":{"b":1}} -> {"a.b":1}
-
- :param data: nested dictionary
- :return: flat dicrionary
- """
- next_data = {}
-
- # check for non-string iterables
- if not any((isinstance(v, collections.Iterable) and not isinstance(v, str))
- for v in data.values()):
- return data
+# dict_flatten was causing recursion errors with Jinja2 so we removed and replaced
+# which this function from stackoverflow that doesn't require generating entire dictionaries
+# each time we query a key
+def deepgetitem(obj, item, default=None):
+ """Steps through an item chain to get the ultimate value.
- for key, val in data.items():
- if isinstance(val, collections.Mapping):
- for n_k, n_v in val.items():
- next_data["%s.%s" % (key, n_k)] = n_v
- elif isinstance(val, collections.Iterable) and not isinstance(val,
- str):
- for index, item in enumerate(val):
- next_data["%s%d" % (key, index)] = item
- else:
- next_data[key] = val
+ If ultimate value or path to value does not exist, does not raise
+ an exception and instead returns `fallback`.
- return dict_key_flatten(next_data)
+ Based on
+ https://stackoverflow.com/a/38623359
+ https://stackoverflow.com/users/1820042/donny-winston
+ add try_int to work with sequences
-def get(obj, key, *args):
- """ Get template key from dictionary, get default value or raise an exception
+ >>> d = {'snl_final': {'about': {'_icsd': {'icsd_id': 1, 'fr': [2, 3]}}}}
+ >>> deepgetitem(d, 'snl_final.about._icsd.icsd_id')
+ 1
+ >>> deepgetitem(d, 'snl_final.about._sandbox.sbx_id')
+ >>>
+ >>> deepgetitem(d, 'snl_final.about._icsd.fr.1')
+ 3
"""
- data = dict_key_flatten(obj)
- return data.get(key, *args)
+ def getitem(obj, name):
+ # if integer then list index
+ name = try_int(name)
+ try:
+ return obj[name]
+ except (KeyError, TypeError, IndexError):
+ return default
+ return reduce(getitem, item.split('.'), obj)
diff --git a/yardstick/network_services/yang_model.py b/yardstick/network_services/yang_model.py
new file mode 100644
index 000000000..fbf224bd8
--- /dev/null
+++ b/yardstick/network_services/yang_model.py
@@ -0,0 +1,107 @@
+# Copyright (c) 2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+from __future__ import print_function
+import logging
+import ipaddress
+import yaml
+import six
+
+LOG = logging.getLogger(__name__)
+
+
+class YangModel(object):
+
+ RULE_TEMPLATE = "p acl add 1 {0} {1} {2} {3} {4} {5} {6} {7} 0 0 {8}"
+
+ def __init__(self, config_file):
+ super(YangModel, self).__init__()
+ self._config_file = config_file
+ self._options = {}
+ self._rules = ''
+
+ @property
+ def config_file(self):
+ return self._config_file
+
+ @config_file.setter
+ def config_file(self, value):
+ self._config_file = value
+ self._options = {}
+ self._rules = ''
+
+ def _read_config(self):
+ # TODO: add some error handling in case of empty or non-existing file
+ try:
+ with open(self._config_file) as f:
+ self._options = yaml.safe_load(f)
+ except Exception as e:
+ LOG.exception("Failed to load the yaml %s", e)
+ raise
+
+ def _get_entries(self):
+ if not self._options:
+ return ''
+
+ rule_list = []
+ for ace in self._options['access-list1']['acl']['access-list-entries']:
+ # TODO: resolve ports using topology file and nodes'
+ # ids: public or private.
+ matches = ace['ace']['matches']
+ dst_ipv4_net = matches['destination-ipv4-network']
+ dst_ipv4_net_ip = ipaddress.ip_interface(six.text_type(dst_ipv4_net))
+ port0_local_network = dst_ipv4_net_ip.network.network_address.exploded
+ port0_prefix = dst_ipv4_net_ip.network.prefixlen
+
+ src_ipv4_net = matches['source-ipv4-network']
+ src_ipv4_net_ip = ipaddress.ip_interface(six.text_type(src_ipv4_net))
+ port1_local_network = src_ipv4_net_ip.network.network_address.exploded
+ port1_prefix = src_ipv4_net_ip.network.prefixlen
+
+ lower_dport = matches['destination-port-range']['lower-port']
+ upper_dport = matches['destination-port-range']['upper-port']
+
+ lower_sport = matches['source-port-range']['lower-port']
+ upper_sport = matches['source-port-range']['upper-port']
+
+ # TODO: proto should be read from file also.
+ # Now all rules in sample ACL file are TCP.
+ rule_list.append('') # get an extra new line
+ rule_list.append(self.RULE_TEMPLATE.format(port0_local_network,
+ port0_prefix,
+ port1_local_network,
+ port1_prefix,
+ lower_dport,
+ upper_dport,
+ lower_sport,
+ upper_sport,
+ 0))
+ rule_list.append(self.RULE_TEMPLATE.format(port1_local_network,
+ port1_prefix,
+ port0_local_network,
+ port0_prefix,
+ lower_sport,
+ upper_sport,
+ lower_dport,
+ upper_dport,
+ 1))
+
+ self._rules = '\n'.join(rule_list)
+
+ def get_rules(self):
+ if not self._rules:
+ self._read_config()
+ self._get_entries()
+ return self._rules