diff options
Diffstat (limited to 'yardstick/network_services/vnf_generic/vnf')
29 files changed, 5901 insertions, 702 deletions
diff --git a/yardstick/network_services/vnf_generic/vnf/acl_vnf.py b/yardstick/network_services/vnf_generic/vnf/acl_vnf.py index 1390dd02e..69d29bf76 100644 --- a/yardstick/network_services/vnf_generic/vnf/acl_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/acl_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,19 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import absolute_import -from __future__ import print_function import logging +import ipaddress +import six +from yardstick.common import utils +from yardstick.common import exceptions -from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper -from yardstick.network_services.yang_model import YangModel +from yardstick.network_services.helpers.samplevnf_helper import PortPairs +from itertools import chain LOG = logging.getLogger(__name__) # ACL should work the same on all systems, we can provide the binary ACL_PIPELINE_COMMAND = \ - 'sudo {tool_path} -p {port_mask_hex} -f {cfg_file} -s {script}' + 'sudo {tool_path} -p {port_mask_hex} -f {cfg_file} -s {script} {hwlb}' ACL_COLLECT_KPI = r"""\ ACL TOTAL:[^p]+pkts_processed"?:\s(\d+),[^p]+pkts_drop"?:\s(\d+),[^p]+pkts_received"?:\s(\d+),""" @@ -40,6 +42,196 @@ class AclApproxSetupEnvSetupEnvHelper(DpdkVnfSetupEnvHelper): SW_DEFAULT_CORE = 5 DEFAULT_CONFIG_TPL_CFG = "acl.cfg" VNF_TYPE = "ACL" + RULE_CMD = "acl" + + DEFAULT_PRIORITY = 1 + DEFAULT_PROTOCOL = 0 + DEFAULT_PROTOCOL_MASK = 0 + # Default actions to be applied to SampleVNF. Please note, + # that this list is extended with `fwd` action when default + # actions are generated. + DEFAULT_FWD_ACTIONS = ["accept", "count"] + + def __init__(self, vnfd_helper, ssh_helper, scenario_helper): + super(AclApproxSetupEnvSetupEnvHelper, self).__init__(vnfd_helper, + ssh_helper, + scenario_helper) + self._action_id = 0 + + def get_ip_from_port(self, port): + # we can't use gateway because in OpenStack gateways interfere with floating ip routing + # return self.make_ip_addr(self.get_ports_gateway(port), self.get_netmask_gateway(port)) + vintf = self.vnfd_helper.find_interface(name=port)["virtual-interface"] + return utils.make_ip_addr(vintf["local_ip"], vintf["netmask"]) + + def get_network_and_prefixlen_from_ip_of_port(self, port): + ip_addr = self.get_ip_from_port(port) + # handle cases with no gateway + if ip_addr: + return ip_addr.network.network_address.exploded, ip_addr.network.prefixlen + else: + return None, None + + @property + def new_action_id(self): + """Get new action id""" + self._action_id += 1 + return self._action_id + + def get_default_flows(self): + """Get default actions/rules + Returns: (<actions>, <rules>) + <actions>: + { <action_id>: [ <list of actions> ]} + Example: + { 0 : [ "accept", "count", {"fwd" : "port": 0} ], ... } + <rules>: + [ {"src_ip": "x.x.x.x", "src_ip_mask", 24, ...}, ... ] + Note: + See `generate_rule_cmds()` to get list of possible map keys. + """ + actions, rules = {}, [] + _port_pairs = PortPairs(self.vnfd_helper.interfaces) + port_pair_list = _port_pairs.port_pair_list + for src_intf, dst_intf in port_pair_list: + # get port numbers of the interfaces + src_port = self.vnfd_helper.port_num(src_intf) + dst_port = self.vnfd_helper.port_num(dst_intf) + # get interface addresses and prefixes + src_net, src_prefix_len = self.get_network_and_prefixlen_from_ip_of_port(src_intf) + dst_net, dst_prefix_len = self.get_network_and_prefixlen_from_ip_of_port(dst_intf) + # ignore entries with empty values + if all((src_net, src_prefix_len, dst_net, dst_prefix_len)): + # flow: src_net:dst_net -> dst_port + action_id = self.new_action_id + actions[action_id] = self.DEFAULT_FWD_ACTIONS[:] + actions[action_id].append({"fwd": {"port": dst_port}}) + rules.append({"priority": 1, 'cmd': self.RULE_CMD, + "src_ip": src_net, "src_ip_mask": src_prefix_len, + "dst_ip": dst_net, "dst_ip_mask": dst_prefix_len, + "src_port_from": 0, "src_port_to": 65535, + "dst_port_from": 0, "dst_port_to": 65535, + "protocol": 0, "protocol_mask": 0, + "action_id": action_id}) + # flow: dst_net:src_net -> src_port + action_id = self.new_action_id + actions[action_id] = self.DEFAULT_FWD_ACTIONS[:] + actions[action_id].append({"fwd": {"port": src_port}}) + rules.append({"cmd":self.RULE_CMD, "priority": 1, + "src_ip": dst_net, "src_ip_mask": dst_prefix_len, + "dst_ip": src_net, "dst_ip_mask": src_prefix_len, + "src_port_from": 0, "src_port_to": 65535, + "dst_port_from": 0, "dst_port_to": 65535, + "protocol": 0, "protocol_mask": 0, + "action_id": action_id}) + return actions, rules + + def get_flows(self, options): + """Get actions/rules based on provided options. + The `options` is a dict representing the ACL rules configuration + file. Result is the same as described in `get_default_flows()`. + """ + actions, rules = {}, [] + for ace in options['access-list-entries']: + # Generate list of actions + action_id = self.new_action_id + actions[action_id] = ace['actions'] + # Destination nestwork + matches = ace['matches'] + dst_ipv4_net = matches['destination-ipv4-network'] + dst_ipv4_net_ip = ipaddress.ip_interface(six.text_type(dst_ipv4_net)) + # Source network + src_ipv4_net = matches['source-ipv4-network'] + src_ipv4_net_ip = ipaddress.ip_interface(six.text_type(src_ipv4_net)) + # Append the rule + rules.append({'action_id': action_id, 'cmd': self.RULE_CMD, + 'dst_ip': dst_ipv4_net_ip.network.network_address.exploded, + 'dst_ip_mask': dst_ipv4_net_ip.network.prefixlen, + 'src_ip': src_ipv4_net_ip.network.network_address.exploded, + 'src_ip_mask': src_ipv4_net_ip.network.prefixlen, + 'dst_port_from': matches['destination-port-range']['lower-port'], + 'dst_port_to': matches['destination-port-range']['upper-port'], + 'src_port_from': matches['source-port-range']['lower-port'], + 'src_port_to': matches['source-port-range']['upper-port'], + 'priority': matches.get('priority', self.DEFAULT_PRIORITY), + 'protocol': matches.get('protocol', self.DEFAULT_PROTOCOL), + 'protocol_mask': matches.get('protocol_mask', + self.DEFAULT_PROTOCOL_MASK) + }) + return actions, rules + + def generate_rule_cmds(self, rules, apply_rules=False): + """Convert rules into list of SampleVNF CLI commands""" + rule_template = ("p {cmd} add {priority} {src_ip} {src_ip_mask} " + "{dst_ip} {dst_ip_mask} {src_port_from} {src_port_to} " + "{dst_port_from} {dst_port_to} {protocol} " + "{protocol_mask} {action_id}") + rule_cmd_list = [] + for rule in rules: + rule_cmd_list.append(rule_template.format(**rule)) + if apply_rules: + # add command to apply all rules at the end + rule_cmd_list.append("p {cmd} applyruleset".format(cmd=self.RULE_CMD)) + return rule_cmd_list + + def generate_action_cmds(self, actions): + """Convert actions into list of SampleVNF CLI commands. + These method doesn't validate the provided list of actions. Supported + list of actions are limited by SampleVNF. Thus, the user should be + responsible to specify correct action name(s). Yardstick should take + the provided action by user and apply it to SampleVNF. + Anyway, some of the actions require addition parameters to be + specified. In case of `fwd` & `nat` action used have to specify + the port attribute. + """ + _action_template_map = { + "fwd": "p action add {action_id} fwd {port}", + "nat": "p action add {action_id} nat {port}" + } + action_cmd_list = [] + for action_id, actions in actions.items(): + for action in actions: + if isinstance(action, dict): + for action_name in action.keys(): + # user provided an action name with addition options + # e.g.: {"fwd": {"port": 0}} + # format action CLI command and add it to the list + if action_name not in _action_template_map.keys(): + raise exceptions.AclUnknownActionTemplate( + action_name=action_name) + template = _action_template_map[action_name] + try: + action_cmd_list.append(template.format( + action_id=action_id, **action[action_name])) + except KeyError as exp: + raise exceptions.AclMissingActionArguments( + action_name=action_name, + action_param=exp.args[0]) + else: + # user provided an action name w/o addition options + # e.g.: "accept", "count" + action_cmd_list.append( + "p action add {action_id} {action}".format( + action_id=action_id, action=action)) + return action_cmd_list + + def get_flows_config(self, options=None): + """Get action/rules configuration commands (string) to be + applied to SampleVNF to configure ACL rules (flows). + """ + action_cmd_list, rule_cmd_list = [], [] + if options: + # if file name is set, read actions/rules from the file + actions, rules = self.get_flows(options) + action_cmd_list = self.generate_action_cmds(actions) + rule_cmd_list = self.generate_rule_cmds(rules) + # default actions/rules + dft_actions, dft_rules = self.get_default_flows() + dft_action_cmd_list = self.generate_action_cmds(dft_actions) + dft_rule_cmd_list = self.generate_rule_cmds(dft_rules, apply_rules=True) + # generate multi-line commands to add actions/rules + return '\n'.join(chain(action_cmd_list, dft_action_cmd_list, + rule_cmd_list, dft_rule_cmd_list)) class AclApproxVnf(SampleVNF): @@ -59,11 +251,7 @@ class AclApproxVnf(SampleVNF): setup_env_helper_type = AclApproxSetupEnvSetupEnvHelper super(AclApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) - self.acl_rules = None - - def _start_vnf(self): - yang_model_path = find_relative_file(self.scenario_helper.options['rules'], - self.scenario_helper.task_path) - yang_model = YangModel(yang_model_path) - self.acl_rules = yang_model.get_rules() - super(AclApproxVnf, self)._start_vnf() + + def wait_for_instantiate(self): + """Wait for VNF to initialize""" + self.wait_for_initialize() diff --git a/yardstick/network_services/vnf_generic/vnf/agnostic_vnf.py b/yardstick/network_services/vnf_generic/vnf/agnostic_vnf.py new file mode 100644 index 000000000..d1d9667db --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/agnostic_vnf.py @@ -0,0 +1,46 @@ +# Copyright (c) 2018-2019 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from yardstick.network_services.vnf_generic.vnf import base + +LOG = logging.getLogger(__name__) + + +class AgnosticVnf(base.GenericVNF): + """ AgnosticVnf implementation. """ + def __init__(self, name, vnfd): + super(AgnosticVnf, self).__init__(name, vnfd) + + def instantiate(self, scenario_cfg, context_cfg): + pass + + def wait_for_instantiate(self): + pass + + def terminate(self): + pass + + def scale(self, flavor=""): + pass + + def collect_kpi(self): + pass + + def start_collect(self): + pass + + def stop_collect(self): + pass diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py index a776b0989..8ef96b744 100644 --- a/yardstick/network_services/vnf_generic/vnf/base.py +++ b/yardstick/network_services/vnf_generic/vnf/base.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -""" Base class implementation for generic vnf implementation """ import abc @@ -95,7 +94,7 @@ class VnfdHelper(dict): for interface in self.interfaces: virtual_intf = interface["virtual-interface"] if virtual_intf[key] == value: - return interface + return virtual_intf raise KeyError() def find_interface(self, **kwargs): @@ -195,10 +194,22 @@ class GenericVNF(object): :return: {"kpi": value, "kpi2": value} """ + @abc.abstractmethod + def start_collect(self): + """Start KPI collection + :return: None + """ + + @abc.abstractmethod + def stop_collect(self): + """Stop KPI collection + :return: None + """ + @six.add_metaclass(abc.ABCMeta) class GenericTrafficGen(GenericVNF): - """ Class providing file-like API for generic traffic generator """ + """Class providing file-like API for generic traffic generator""" def __init__(self, name, vnfd): super(GenericTrafficGen, self).__init__(name, vnfd) @@ -254,3 +265,23 @@ class GenericTrafficGen(GenericVNF): :return: True/False """ pass + + def start_collect(self): + """Start KPI collection. + + Traffic measurements are always collected during injection. + + Optional. + + :return: True/False + """ + pass + + def stop_collect(self): + """Stop KPI collection. + + Optional. + + :return: True/False + """ + pass diff --git a/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py b/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py index 53f73b4d7..ee4a581b1 100644 --- a/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,10 +21,10 @@ from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, Dpd LOG = logging.getLogger(__name__) # CGNAPT should work the same on all systems, we can provide the binary -CGNAPT_PIPELINE_COMMAND = 'sudo {tool_path} -p {port_mask_hex} -f {cfg_file} -s {script}' +CGNAPT_PIPELINE_COMMAND = 'sudo {tool_path} -p {port_mask_hex} -f {cfg_file} -s {script} {hwlb}' WAIT_FOR_STATIC_NAPT = 4 -CGNAPT_COLLECT_KPI = """\ +CGNAPT_COLLECT_KPI = r"""\ CG-NAPT(.*\n)*\ Received\s(\d+),\ Missed\s(\d+),\ @@ -120,3 +120,7 @@ class CgnaptApproxVnf(SampleVNF): self.vnf_execute(cmd) time.sleep(WAIT_FOR_STATIC_NAPT) + + def wait_for_instantiate(self): + """Wait for VNF to initialize""" + self.wait_for_initialize() diff --git a/yardstick/network_services/vnf_generic/vnf/epc_vnf.py b/yardstick/network_services/vnf_generic/vnf/epc_vnf.py new file mode 100644 index 000000000..8112963e9 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/epc_vnf.py @@ -0,0 +1,53 @@ +# Copyright (c) 2018-2019 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from yardstick.network_services.vnf_generic.vnf import base + +LOG = logging.getLogger(__name__) + + +class EPCVnf(base.GenericVNF): + + def __init__(self, name, vnfd): + super(EPCVnf, self).__init__(name, vnfd) + + def instantiate(self, scenario_cfg, context_cfg): + """Prepare VNF for operation and start the VNF process/VM + + :param scenario_cfg: Scenario config + :param context_cfg: Context config + """ + pass + + def wait_for_instantiate(self): + """Wait for VNF to start""" + pass + + def terminate(self): + """Kill all VNF processes""" + pass + + def scale(self, flavor=""): + pass + + def collect_kpi(self): + pass + + def start_collect(self): + pass + + def stop_collect(self): + pass diff --git a/yardstick/network_services/vnf_generic/vnf/ipsec_vnf.py b/yardstick/network_services/vnf_generic/vnf/ipsec_vnf.py new file mode 100644 index 000000000..1961ac1b1 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/ipsec_vnf.py @@ -0,0 +1,498 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re +import time +from collections import Counter +from enum import Enum + +from yardstick.benchmark.contexts.base import Context +from yardstick.common.process import check_if_process_failed +from yardstick.network_services import constants +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF +from yardstick.network_services.vnf_generic.vnf.vpp_helpers import \ + VppSetupEnvHelper, VppConfigGenerator + +LOG = logging.getLogger(__name__) + + +class CryptoAlg(Enum): + """Encryption algorithms.""" + AES_CBC_128 = ('aes-cbc-128', 'AES-CBC', 16) + AES_CBC_192 = ('aes-cbc-192', 'AES-CBC', 24) + AES_CBC_256 = ('aes-cbc-256', 'AES-CBC', 32) + AES_GCM_128 = ('aes-gcm-128', 'AES-GCM', 20) + + def __init__(self, alg_name, scapy_name, key_len): + self.alg_name = alg_name + self.scapy_name = scapy_name + self.key_len = key_len + + +class IntegAlg(Enum): + """Integrity algorithms.""" + SHA1_96 = ('sha1-96', 'HMAC-SHA1-96', 20) + SHA_256_128 = ('sha-256-128', 'SHA2-256-128', 32) + SHA_384_192 = ('sha-384-192', 'SHA2-384-192', 48) + SHA_512_256 = ('sha-512-256', 'SHA2-512-256', 64) + AES_GCM_128 = ('aes-gcm-128', 'AES-GCM', 20) + + def __init__(self, alg_name, scapy_name, key_len): + self.alg_name = alg_name + self.scapy_name = scapy_name + self.key_len = key_len + + +class VipsecApproxSetupEnvHelper(VppSetupEnvHelper): + DEFAULT_IPSEC_VNF_CFG = { + 'crypto_type': 'SW_cryptodev', + 'rxq': 1, + 'worker_config': '1C/1T', + 'worker_threads': 1, + } + + def __init__(self, vnfd_helper, ssh_helper, scenario_helper): + super(VipsecApproxSetupEnvHelper, self).__init__( + vnfd_helper, ssh_helper, scenario_helper) + + def _get_crypto_type(self): + vnf_cfg = self.scenario_helper.options.get('vnf_config', + self.DEFAULT_IPSEC_VNF_CFG) + return vnf_cfg.get('crypto_type', 'SW_cryptodev') + + def _get_crypto_algorithms(self): + vpp_cfg = self.scenario_helper.all_options.get('vpp_config', {}) + return vpp_cfg.get('crypto_algorithms', 'aes-gcm') + + def _get_n_tunnels(self): + vpp_cfg = self.scenario_helper.all_options.get('vpp_config', {}) + return vpp_cfg.get('tunnels', 1) + + def _get_n_connections(self): + try: + flow_cfg = self.scenario_helper.all_options['flow'] + return flow_cfg['count'] + except KeyError: + raise KeyError("Missing flow definition in scenario section" + + " of the task definition file") + + def _get_flow_src_start_ip(self): + node_name = self.find_encrypted_data_interface()["node_name"] + try: + flow_cfg = self.scenario_helper.all_options['flow'] + src_ips = flow_cfg['src_ip'] + dst_ips = flow_cfg['dst_ip'] + except KeyError: + raise KeyError("Missing flow definition in scenario section" + + " of the task definition file") + + for src, dst in zip(src_ips, dst_ips): + flow_src_start_ip, _ = src.split('-') + flow_dst_start_ip, _ = dst.split('-') + + if node_name == "vnf__0": + return flow_src_start_ip + elif node_name == "vnf__1": + return flow_dst_start_ip + + def _get_flow_dst_start_ip(self): + node_name = self.find_encrypted_data_interface()["node_name"] + try: + flow_cfg = self.scenario_helper.all_options['flow'] + src_ips = flow_cfg['src_ip'] + dst_ips = flow_cfg['dst_ip'] + except KeyError: + raise KeyError("Missing flow definition in scenario section" + + " of the task definition file") + + for src, dst in zip(src_ips, dst_ips): + flow_src_start_ip, _ = src.split('-') + flow_dst_start_ip, _ = dst.split('-') + + if node_name == "vnf__0": + return flow_dst_start_ip + elif node_name == "vnf__1": + return flow_src_start_ip + + def build_config(self): + vnf_cfg = self.scenario_helper.options.get('vnf_config', + self.DEFAULT_IPSEC_VNF_CFG) + rxq = vnf_cfg.get('rxq', 1) + phy_cores = vnf_cfg.get('worker_threads', 1) + # worker_config = vnf_cfg.get('worker_config', '1C/1T').split('/')[1].lower() + + vpp_cfg = self.create_startup_configuration_of_vpp() + self.add_worker_threads_and_rxqueues(vpp_cfg, phy_cores, rxq) + self.add_pci_devices(vpp_cfg) + + frame_size_cfg = self.scenario_helper.all_options.get('framesize', {}) + uplink_cfg = frame_size_cfg.get('uplink', {}) + downlink_cfg = frame_size_cfg.get('downlink', {}) + framesize = min(self.calculate_frame_size(uplink_cfg), + self.calculate_frame_size(downlink_cfg)) + if framesize < 1522: + vpp_cfg.add_dpdk_no_multi_seg() + + crypto_algorithms = self._get_crypto_algorithms() + if crypto_algorithms == 'aes-gcm': + self.add_dpdk_cryptodev(vpp_cfg, 'aesni_gcm', phy_cores) + elif crypto_algorithms == 'cbc-sha1': + self.add_dpdk_cryptodev(vpp_cfg, 'aesni_mb', phy_cores) + + vpp_cfg.add_dpdk_dev_default_rxd(2048) + vpp_cfg.add_dpdk_dev_default_txd(2048) + self.apply_config(vpp_cfg, True) + self.update_vpp_interface_data() + + def setup_vnf_environment(self): + resource = super(VipsecApproxSetupEnvHelper, + self).setup_vnf_environment() + + self.start_vpp_service() + # for QAT device DH895xCC, the number of VFs is required as 32 + if self._get_crypto_type() == 'HW_cryptodev': + sriov_numvfs = self.get_sriov_numvfs( + self.find_encrypted_data_interface()["vpci"]) + if sriov_numvfs != 32: + self.crypto_device_init( + self.find_encrypted_data_interface()["vpci"], 32) + + self._update_vnfd_helper(self.sys_cores.get_cpu_layout()) + self.update_vpp_interface_data() + self.iface_update_numa() + + return resource + + @staticmethod + def calculate_frame_size(frame_cfg): + if not frame_cfg: + return 64 + + imix_count = {size.upper().replace('B', ''): int(weight) + for size, weight in frame_cfg.items()} + imix_sum = sum(imix_count.values()) + if imix_sum <= 0: + return 64 + packets_total = sum([int(size) * weight + for size, weight in imix_count.items()]) + return packets_total / imix_sum + + def check_status(self): + ipsec_created = False + cmd = "vppctl show int" + _, stdout, _ = self.ssh_helper.execute(cmd) + entries = re.split(r"\n+", stdout) + tmp = [re.split(r"\s\s+", entry, 5) for entry in entries] + + for item in tmp: + if isinstance(item, list): + if item[0] and item[0] != 'local0': + if "ipsec" in item[0] and not ipsec_created: + ipsec_created = True + if len(item) > 2 and item[2] == 'down': + return False + return ipsec_created + + def get_vpp_statistics(self): + cmd = "vppctl show int {intf}" + result = {} + for interface in self.vnfd_helper.interfaces: + iface_name = self.get_value_by_interface_key( + interface["virtual-interface"]["ifname"], "vpp_name") + command = cmd.format(intf=iface_name) + _, stdout, _ = self.ssh_helper.execute(command) + result.update( + self.parser_vpp_stats(interface["virtual-interface"]["ifname"], + iface_name, stdout)) + self.ssh_helper.execute("vppctl clear interfaces") + return result + + @staticmethod + def parser_vpp_stats(interface, iface_name, stats): + packets_in = 0 + packets_fwd = 0 + packets_dropped = 0 + result = {} + + entries = re.split(r"\n+", stats) + tmp = [re.split(r"\s\s+", entry, 5) for entry in entries] + + for item in tmp: + if isinstance(item, list): + if item[0] == iface_name and len(item) >= 5: + if item[3] == 'rx packets': + packets_in = int(item[4]) + elif item[4] == 'rx packets': + packets_in = int(item[5]) + elif len(item) == 3: + if item[1] == 'tx packets': + packets_fwd = int(item[2]) + elif item[1] == 'drops' or item[1] == 'rx-miss': + packets_dropped = int(item[2]) + if packets_dropped == 0 and packets_in > 0 and packets_fwd > 0: + packets_dropped = abs(packets_fwd - packets_in) + + result[interface] = { + 'packets_in': packets_in, + 'packets_fwd': packets_fwd, + 'packets_dropped': packets_dropped, + } + + return result + + def create_ipsec_tunnels(self): + self.initialize_ipsec() + + # TODO generate the same key + crypto_algorithms = self._get_crypto_algorithms() + if crypto_algorithms == 'aes-gcm': + encr_alg = CryptoAlg.AES_GCM_128 + auth_alg = IntegAlg.AES_GCM_128 + encr_key = 'LNYZXMBQDKESNLREHJMS' + auth_key = 'SWGLDTYZSQKVBZZMPIEV' + elif crypto_algorithms == 'cbc-sha1': + encr_alg = CryptoAlg.AES_CBC_128 + auth_alg = IntegAlg.SHA1_96 + encr_key = 'IFEMSHYLCZIYFUTT' + auth_key = 'PEALEIPSCPTRHYJSDXLY' + + self.execute_script("enable_dpdk_traces.vat", json_out=False) + self.execute_script("enable_vhost_user_traces.vat", json_out=False) + self.execute_script("enable_memif_traces.vat", json_out=False) + + node_name = self.find_encrypted_data_interface()["node_name"] + n_tunnels = self._get_n_tunnels() + n_connections = self._get_n_connections() + flow_dst_start_ip = self._get_flow_dst_start_ip() + if node_name == "vnf__0": + self.vpp_create_ipsec_tunnels( + self.find_encrypted_data_interface()["local_ip"], + self.find_encrypted_data_interface()["peer_intf"]["local_ip"], + self.find_encrypted_data_interface()["ifname"], + n_tunnels, n_connections, encr_alg, encr_key, auth_alg, + auth_key, flow_dst_start_ip) + elif node_name == "vnf__1": + self.vpp_create_ipsec_tunnels( + self.find_encrypted_data_interface()["local_ip"], + self.find_encrypted_data_interface()["peer_intf"]["local_ip"], + self.find_encrypted_data_interface()["ifname"], + n_tunnels, n_connections, encr_alg, encr_key, auth_alg, + auth_key, flow_dst_start_ip, 20000, 10000) + + def find_raw_data_interface(self): + try: + return self.vnfd_helper.find_virtual_interface(vld_id="uplink_0") + except KeyError: + return self.vnfd_helper.find_virtual_interface(vld_id="downlink_0") + + def find_encrypted_data_interface(self): + return self.vnfd_helper.find_virtual_interface(vld_id="ciphertext") + + def create_startup_configuration_of_vpp(self): + vpp_config_generator = VppConfigGenerator() + vpp_config_generator.add_unix_log() + vpp_config_generator.add_unix_cli_listen() + vpp_config_generator.add_unix_nodaemon() + vpp_config_generator.add_unix_coredump() + vpp_config_generator.add_dpdk_socketmem('1024,1024') + vpp_config_generator.add_dpdk_no_tx_checksum_offload() + vpp_config_generator.add_dpdk_log_level('debug') + for interface in self.vnfd_helper.interfaces: + vpp_config_generator.add_dpdk_uio_driver( + interface["virtual-interface"]["driver"]) + vpp_config_generator.add_heapsize('4G') + # TODO Enable configuration depend on VPP version + vpp_config_generator.add_statseg_size('4G') + vpp_config_generator.add_plugin('disable', ['default']) + vpp_config_generator.add_plugin('enable', ['dpdk_plugin.so']) + vpp_config_generator.add_ip6_hash_buckets('2000000') + vpp_config_generator.add_ip6_heap_size('4G') + vpp_config_generator.add_ip_heap_size('4G') + return vpp_config_generator + + def add_worker_threads_and_rxqueues(self, vpp_cfg, phy_cores, + rx_queues=None): + thr_count_int = phy_cores + cpu_count_int = phy_cores + num_mbufs_int = 32768 + + numa_list = [] + + if_list = [self.find_encrypted_data_interface()["ifname"], + self.find_raw_data_interface()["ifname"]] + for if_key in if_list: + try: + numa_list.append( + self.get_value_by_interface_key(if_key, 'numa_node')) + except KeyError: + pass + numa_cnt_mc = Counter(numa_list).most_common() + + if numa_cnt_mc and numa_cnt_mc[0][0] is not None and \ + numa_cnt_mc[0][0] != -1: + numa = numa_cnt_mc[0][0] + elif len(numa_cnt_mc) > 1 and numa_cnt_mc[0][0] == -1: + numa = numa_cnt_mc[1][0] + else: + numa = 0 + + try: + smt_used = self.sys_cores.is_smt_enabled() + except KeyError: + smt_used = False + + cpu_main = self.sys_cores.cpu_list_per_node_str(numa, skip_cnt=1, + cpu_cnt=1) + cpu_wt = self.sys_cores.cpu_list_per_node_str(numa, skip_cnt=2, + cpu_cnt=cpu_count_int, + smt_used=smt_used) + + if smt_used: + thr_count_int = 2 * cpu_count_int + + if rx_queues is None: + rxq_count_int = int(thr_count_int / 2) + else: + rxq_count_int = rx_queues + + if rxq_count_int == 0: + rxq_count_int = 1 + + num_mbufs_int = num_mbufs_int * rxq_count_int + + vpp_cfg.add_cpu_main_core(cpu_main) + vpp_cfg.add_cpu_corelist_workers(cpu_wt) + vpp_cfg.add_dpdk_dev_default_rxq(rxq_count_int) + vpp_cfg.add_dpdk_num_mbufs(num_mbufs_int) + + def add_pci_devices(self, vpp_cfg): + pci_devs = [self.find_encrypted_data_interface()["vpci"], + self.find_raw_data_interface()["vpci"]] + vpp_cfg.add_dpdk_dev(*pci_devs) + + def add_dpdk_cryptodev(self, vpp_cfg, sw_pmd_type, count): + crypto_type = self._get_crypto_type() + smt_used = self.sys_cores.is_smt_enabled() + cryptodev = self.find_encrypted_data_interface()["vpci"] + socket_id = self.get_value_by_interface_key( + self.find_encrypted_data_interface()["ifname"], "numa_node") + + if smt_used: + thr_count_int = count * 2 + if crypto_type == 'HW_cryptodev': + vpp_cfg.add_dpdk_cryptodev(thr_count_int, cryptodev) + else: + vpp_cfg.add_dpdk_sw_cryptodev(sw_pmd_type, socket_id, + thr_count_int) + else: + thr_count_int = count + if crypto_type == 'HW_cryptodev': + vpp_cfg.add_dpdk_cryptodev(thr_count_int, cryptodev) + else: + vpp_cfg.add_dpdk_sw_cryptodev(sw_pmd_type, socket_id, + thr_count_int) + + def initialize_ipsec(self): + flow_src_start_ip = self._get_flow_src_start_ip() + + self.set_interface_state( + self.find_encrypted_data_interface()["ifname"], 'up') + self.set_interface_state(self.find_raw_data_interface()["ifname"], + 'up') + self.vpp_interfaces_ready_wait() + self.vpp_set_interface_mtu( + self.find_encrypted_data_interface()["ifname"]) + self.vpp_set_interface_mtu(self.find_raw_data_interface()["ifname"]) + self.vpp_interfaces_ready_wait() + + self.set_ip(self.find_encrypted_data_interface()["ifname"], + self.find_encrypted_data_interface()["local_ip"], 24) + self.set_ip(self.find_raw_data_interface()["ifname"], + self.find_raw_data_interface()["local_ip"], + 24) + + self.add_arp_on_dut(self.find_encrypted_data_interface()["ifname"], + self.find_encrypted_data_interface()["peer_intf"][ + "local_ip"], + self.find_encrypted_data_interface()["peer_intf"][ + "local_mac"]) + self.add_arp_on_dut(self.find_raw_data_interface()["ifname"], + self.find_raw_data_interface()["peer_intf"][ + "local_ip"], + self.find_raw_data_interface()["peer_intf"][ + "local_mac"]) + + self.vpp_route_add(flow_src_start_ip, 8, + self.find_raw_data_interface()["peer_intf"][ + "local_ip"], + self.find_raw_data_interface()["ifname"]) + + +class VipsecApproxVnf(SampleVNF): + """ This class handles vIPSEC VNF model-driver definitions """ + + APP_NAME = 'vIPSEC' + APP_WORD = 'vipsec' + WAIT_TIME = 20 + + def __init__(self, name, vnfd, setup_env_helper_type=None, + resource_helper_type=None): + if setup_env_helper_type is None: + setup_env_helper_type = VipsecApproxSetupEnvHelper + super(VipsecApproxVnf, self).__init__( + name, vnfd, setup_env_helper_type, + resource_helper_type) + + def _run(self): + # we can't share ssh paramiko objects to force new connection + self.ssh_helper.drop_connection() + # kill before starting + self.setup_helper.kill_vnf() + self._build_config() + self.setup_helper.create_ipsec_tunnels() + + def wait_for_instantiate(self): + time.sleep(self.WAIT_TIME) + while True: + status = self.setup_helper.check_status() + if not self._vnf_process.is_alive() and not status: + raise RuntimeError("%s VNF process died." % self.APP_NAME) + LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME) + time.sleep(self.WAIT_TIME_FOR_SCRIPT) + status = self.setup_helper.check_status() + if status: + LOG.info("%s VNF is up and running.", self.APP_NAME) + self._vnf_up_post() + return self._vnf_process.exitcode + + def terminate(self): + self.setup_helper.kill_vnf() + self._tear_down() + self.resource_helper.stop_collect() + if self._vnf_process is not None: + # be proper and join first before we kill + LOG.debug("joining before terminate %s", self._vnf_process.name) + self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT) + self._vnf_process.terminate() + + def collect_kpi(self): + # we can't get KPIs if the VNF is down + check_if_process_failed(self._vnf_process, 0.01) + physical_node = Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + result = {"physical_node": physical_node} + result["collect_stats"] = self.setup_helper.get_vpp_statistics() + LOG.debug("%s collect KPIs %s", self.APP_NAME, result) + return result diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py index 285ead3b6..3507315f2 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -1,4 +1,4 @@ -# Copyright (c) 2017 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import re import select import socket import time + from collections import OrderedDict, namedtuple from contextlib import contextmanager from itertools import repeat, chain @@ -30,12 +31,12 @@ import six from six.moves import cStringIO from six.moves import zip, StringIO -from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file from yardstick.common import utils from yardstick.common.utils import SocketTopology, join_non_strings, try_int from yardstick.network_services.helpers.iniparser import ConfigParser from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper +from yardstick.network_services import constants PROX_PORT = 8474 @@ -44,8 +45,9 @@ SECTION_CONTENTS = 1 LOG = logging.getLogger(__name__) LOG.setLevel(logging.DEBUG) +LOG_RESULT = logging.getLogger('yardstick') +LOG_RESULT.setLevel(logging.DEBUG) -TEN_GIGABIT = 1e10 BITS_PER_BYTE = 8 RETRY_SECONDS = 60 RETRY_INTERVAL = 1 @@ -124,7 +126,8 @@ class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')): class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,' 'delta_tx,delta_tsc,' - 'latency,rx_total,tx_total,pps')): + 'latency,rx_total,tx_total,' + 'requested_pps')): @property def pkt_loss(self): try: @@ -133,11 +136,16 @@ class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_ return 100.0 @property - def mpps(self): + def tx_mpps(self): # calculate the effective throughput in Mpps return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6 @property + def rx_mpps(self): + # calculate the effective throughput in Mpps + return float(self.delta_rx) * self.tsc_hz / self.delta_tsc / 1e6 + + @property def can_be_lost(self): return int(self.tx_total * self.tolerated / 1e2) @@ -163,11 +171,12 @@ class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_ ] samples = { - "Throughput": self.mpps, + "Throughput": self.rx_mpps, + "RxThroughput": self.rx_mpps, "DropPackets": pkt_loss, "CurrentDropPackets": pkt_loss, - "TxThroughput": self.pps / 1e6, - "RxThroughput": self.mpps, + "RequestedTxThroughput": self.requested_pps / 1e6, + "TxThroughput": self.tx_mpps, "PktSize": pkt_size, } if port_samples: @@ -178,11 +187,12 @@ class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_ def log_data(self, logger=None): if logger is None: - logger = LOG + logger = LOG_RESULT template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)" - logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost) - logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps) + logger.info(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost) + logger.info("Mpps configured: %f; Mpps generated %f; Mpps received %f", + self.requested_pps / 1e6, self.tx_mpps, self.rx_mpps) class PacketDump(object): @@ -289,7 +299,7 @@ class ProxSocketHelper(object): if mode != 'pktdump': # Regular 1-line message. Stop reading from the socket. LOG.debug("Regular response read") - return ret_str + return ret_str, True LOG.debug("Packet dump header read: [%s]", ret_str) @@ -310,13 +320,34 @@ class ProxSocketHelper(object): # Return boolean instead of string to signal # successful reception of the packet dump. LOG.debug("Packet dump stored, returning") - return True + return True, False index = data_end + 1 - return ret_str + return ret_str, False - def get_data(self, pkt_dump_only=False, timeout=1): + def get_string(self, pkt_dump_only=False, timeout=0.01): + + def is_ready_string(): + # recv() is blocking, so avoid calling it when no data is waiting. + ready = select.select([self._sock], [], [], timeout) + return bool(ready[0]) + + status = False + ret_str = "" + while status is False: + for status in iter(is_ready_string, False): + decoded_data = self._sock.recv(256).decode('utf-8') + ret_str, done = self._parse_socket_data(decoded_data, + pkt_dump_only) + if (done): + status = True + break + + LOG.debug("Received data from socket: [%s]", ret_str) + return status, ret_str + + def get_data(self, pkt_dump_only=False, timeout=10.0): """ read data from the socket """ # This method behaves slightly differently depending on whether it is @@ -353,7 +384,9 @@ class ProxSocketHelper(object): ret_str = "" for status in iter(is_ready, False): decoded_data = self._sock.recv(256).decode('utf-8') - ret_str = self._parse_socket_data(decoded_data, pkt_dump_only) + ret_str, done = self._parse_socket_data(decoded_data, pkt_dump_only) + if (done): + break LOG.debug("Received data from socket: [%s]", ret_str) return ret_str if status else '' @@ -383,13 +416,17 @@ class ProxSocketHelper(object): """ stop all cores on the remote instance """ LOG.debug("Stop all") self.put_command("stop all\n") - time.sleep(3) def stop(self, cores, task=''): """ stop specific cores on the remote instance """ - LOG.debug("Stopping cores %s", cores) - self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task)) - time.sleep(3) + + tmpcores = [] + for core in cores: + if core not in tmpcores: + tmpcores.append(core) + + LOG.debug("Stopping cores %s", tmpcores) + self.put_command("stop {} {}\n".format(join_non_strings(',', tmpcores), task)) def start_all(self): """ start all cores on the remote instance """ @@ -398,15 +435,19 @@ class ProxSocketHelper(object): def start(self, cores): """ start specific cores on the remote instance """ - LOG.debug("Starting cores %s", cores) - self.put_command("start {}\n".format(join_non_strings(',', cores))) - time.sleep(3) + + tmpcores = [] + for core in cores: + if core not in tmpcores: + tmpcores.append(core) + + LOG.debug("Starting cores %s", tmpcores) + self.put_command("start {}\n".format(join_non_strings(',', tmpcores))) def reset_stats(self): """ reset the statistics on the remote instance """ LOG.debug("Reset stats") self.put_command("reset stats\n") - time.sleep(1) def _run_template_over_cores(self, template, cores, *args): for core in cores: @@ -417,7 +458,6 @@ class ProxSocketHelper(object): LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size) pkt_size -= 4 self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size) - time.sleep(1) def set_value(self, cores, offset, value, length): """ set value on the remote instance """ @@ -467,13 +507,14 @@ class ProxSocketHelper(object): core_data['current'] = core_data[key1] + core_data[key2] self.set_speed(core_data['cores'], core_data['current']) - def set_pps(self, cores, pps, pkt_size): + def set_pps(self, cores, pps, pkt_size, + line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)): """ set packets per second for specific cores on the remote instance """ msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)" LOG.debug(msg, cores, pps, pkt_size) # speed in percent of line-rate - speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE + speed = float(pps) * (pkt_size + 20) / line_speed / BITS_PER_BYTE self._run_template_over_cores("speed {} 0 {}\n", cores, speed) def lat_stats(self, cores, task=0): @@ -520,6 +561,174 @@ class ProxSocketHelper(object): tsc = int(ret[3]) return rx, tx, drop, tsc + def irq_core_stats(self, cores_tasks): + """ get IRQ stats per core""" + + stat = {} + core = 0 + task = 0 + for core, task in cores_tasks: + self.put_command("stats task.core({}).task({}).max_irq,task.core({}).task({}).irq(0)," + "task.core({}).task({}).irq(1),task.core({}).task({}).irq(2)," + "task.core({}).task({}).irq(3),task.core({}).task({}).irq(4)," + "task.core({}).task({}).irq(5),task.core({}).task({}).irq(6)," + "task.core({}).task({}).irq(7),task.core({}).task({}).irq(8)," + "task.core({}).task({}).irq(9),task.core({}).task({}).irq(10)," + "task.core({}).task({}).irq(11),task.core({}).task({}).irq(12)" + "\n".format(core, task, core, task, core, task, core, task, + core, task, core, task, core, task, core, task, + core, task, core, task, core, task, core, task, + core, task, core, task)) + in_data_str = self.get_data().split(",") + ret = [try_int(s, 0) for s in in_data_str] + key = "core_" + str(core) + try: + stat[key] = {"cpu": core, "max_irq": ret[0], "bucket_0" : ret[1], + "bucket_1" : ret[2], "bucket_2" : ret[3], + "bucket_3" : ret[4], "bucket_4" : ret[5], + "bucket_5" : ret[6], "bucket_6" : ret[7], + "bucket_7" : ret[8], "bucket_8" : ret[9], + "bucket_9" : ret[10], "bucket_10" : ret[11], + "bucket_11" : ret[12], "bucket_12" : ret[13], + "overflow": ret[10] + ret[11] + ret[12] + ret[13]} + except (KeyError, IndexError): + LOG.error("Corrupted PACKET %s", in_data_str) + + return stat + + def multi_port_stats(self, ports): + """get counter values from all ports at once""" + + ports_str = ",".join(map(str, ports)) + ports_all_data = [] + tot_result = [0] * len(ports) + + port_index = 0 + while (len(ports) is not len(ports_all_data)): + self.put_command("multi port stats {}\n".format(ports_str)) + status, ports_all_data_str = self.get_string() + + if not status: + return False, [] + + ports_all_data = ports_all_data_str.split(";") + + if len(ports) is len(ports_all_data): + for port_data_str in ports_all_data: + + tmpdata = [] + try: + tmpdata = [try_int(s, 0) for s in port_data_str.split(",")] + except (IndexError, TypeError): + LOG.error("Unpacking data error %s", port_data_str) + return False, [] + + if (len(tmpdata) < 6) or tmpdata[0] not in ports: + LOG.error("Corrupted PACKET %s - retrying", port_data_str) + return False, [] + else: + tot_result[port_index] = tmpdata + port_index = port_index + 1 + else: + LOG.error("Empty / too much data - retry -%s-", ports_all_data) + return False, [] + + LOG.debug("Multi port packet ..OK.. %s", tot_result) + return True, tot_result + + @staticmethod + def multi_port_stats_tuple(stats, ports): + """ + Create a statistics tuple from port stats. + + Returns a dict with contains the port stats indexed by port name + + :param stats: (List) - List of List of port stats in pps + :param ports (Iterator) - to List of Ports + + :return: (Dict) of port stats indexed by port_name + """ + + samples = {} + port_names = {} + try: + port_names = {port_num: port_name for port_name, port_num in ports} + except (TypeError, IndexError, KeyError): + LOG.critical("Ports are not initialized or number of port is ZERO ... CRITICAL ERROR") + return {} + + try: + for stat in stats: + port_num = stat[0] + samples[port_names[port_num]] = { + "in_packets": stat[1], + "out_packets": stat[2]} + except (TypeError, IndexError, KeyError): + LOG.error("Ports data and samples data is incompatable ....") + return {} + + return samples + + @staticmethod + def multi_port_stats_diff(prev_stats, new_stats, hz): + """ + Create a statistics tuple from difference between prev port stats + and current port stats. And store results in pps. + + :param prev_stats: (List) - Previous List of port statistics + :param new_stats: (List) - Current List of port statistics + :param hz (float) - speed of system in Hz + + :return: sample (List) - Difference of prev_port_stats and + new_port_stats in pps + """ + + RX_TOTAL_INDEX = 1 + TX_TOTAL_INDEX = 2 + TSC_INDEX = 5 + + stats = [] + + if len(prev_stats) is not len(new_stats): + for port_index, stat in enumerate(new_stats): + stats.append([port_index, float(0), float(0), 0, 0, 0]) + return stats + + try: + for port_index, stat in enumerate(new_stats): + if stat[RX_TOTAL_INDEX] > prev_stats[port_index][RX_TOTAL_INDEX]: + rx_total = stat[RX_TOTAL_INDEX] - \ + prev_stats[port_index][RX_TOTAL_INDEX] + else: + rx_total = stat[RX_TOTAL_INDEX] + + if stat[TX_TOTAL_INDEX] > prev_stats[port_index][TX_TOTAL_INDEX]: + tx_total = stat[TX_TOTAL_INDEX] - prev_stats[port_index][TX_TOTAL_INDEX] + else: + tx_total = stat[TX_TOTAL_INDEX] + + if stat[TSC_INDEX] > prev_stats[port_index][TSC_INDEX]: + tsc = stat[TSC_INDEX] - prev_stats[port_index][TSC_INDEX] + else: + tsc = stat[TSC_INDEX] + + if tsc is 0: + rx_total = tx_total = float(0) + else: + if hz is 0: + LOG.error("HZ is ZERO ..") + rx_total = tx_total = float(0) + else: + rx_total = float(rx_total * hz / tsc) + tx_total = float(tx_total * hz / tsc) + + stats.append([port_index, rx_total, tx_total, 0, 0, tsc]) + except (TypeError, IndexError, KeyError): + stats = [] + LOG.info("Current Port Stats incompatable to previous Port stats .. Discarded") + + return stats + def port_stats(self, ports): """get counter values from a specific port""" tot_result = [0] * 12 @@ -580,7 +789,6 @@ class ProxSocketHelper(object): self.put_command("quit_force\n") time.sleep(3) - _LOCAL_OBJECT = object() @@ -662,6 +870,30 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): file_str[1] = self.additional_files[base_name] return '"'.join(file_str) + def _make_core_list(self, inputStr): + + my_input = inputStr.split("core ", 1)[1] + ok_list = set() + + substrs = [x.strip() for x in my_input.split(',')] + for i in substrs: + try: + ok_list.add(int(i)) + + except ValueError: + try: + substr = [int(k.strip()) for k in i.split('-')] + if len(substr) > 1: + startstr = substr[0] + endstr = substr[len(substr) - 1] + for z in range(startstr, endstr + 1): + ok_list.add(z) + except ValueError: + LOG.error("Error in cores list ... resuming ") + return ok_list + + return ok_list + def generate_prox_config_file(self, config_path): sections = [] prox_config = ConfigParser(config_path, sections) @@ -681,6 +913,18 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): if section_data[0] == "mac": section_data[1] = "hardware" + # adjust for range of cores + new_sections = [] + for section_name, section in sections: + if section_name.startswith('core') and section_name.find('$') == -1: + core_list = self._make_core_list(section_name) + for core in core_list: + new_sections.append(["core " + str(core), section]) + else: + new_sections.append([section_name, section]) + + sections = new_sections + # search for dst mac for _, section in sections: for section_data in section: @@ -699,6 +943,20 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): mac = intf["virtual-interface"]["dst_mac"] section_data[1] = mac + if item_val.startswith("@@src_mac"): + tx_port_iter = re.finditer(r'\d+', item_val) + tx_port_no = int(next(tx_port_iter).group(0)) + intf = self.vnfd_helper.find_interface_by_port(tx_port_no) + mac = intf["virtual-interface"]["local_mac"] + section_data[1] = mac.replace(":", " ", 6) + + if item_key == "src mac" and item_val.startswith("@@"): + tx_port_iter = re.finditer(r'\d+', item_val) + tx_port_no = int(next(tx_port_iter).group(0)) + intf = self.vnfd_helper.find_interface_by_port(tx_port_no) + mac = intf["virtual-interface"]["local_mac"] + section_data[1] = mac + # if addition file specified in prox config if not self.additional_files: return sections @@ -798,7 +1056,7 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): options = self.scenario_helper.options config_path = options['prox_config'] config_file = os.path.basename(config_path) - config_path = find_relative_file(config_path, task_path) + config_path = utils.find_relative_file(config_path, task_path) self.additional_files = {} try: @@ -815,7 +1073,7 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): prox_files = [prox_files] for key_prox_file in prox_files: base_prox_file = os.path.basename(key_prox_file) - key_prox_path = find_relative_file(key_prox_file, task_path) + key_prox_path = utils.find_relative_file(key_prox_file, task_path) remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file) self.additional_files[base_prox_file] = remote_prox_file @@ -873,6 +1131,8 @@ class ProxResourceHelper(ClientResourceHelper): self.step_delta = 1 self.step_time = 0.5 self._test_type = None + self.prev_multi_port = [] + self.prev_hz = 0 @property def sut(self): @@ -901,7 +1161,7 @@ class ProxResourceHelper(ClientResourceHelper): def _run_traffic_once(self, traffic_profile): traffic_profile.execute_traffic(self) - if traffic_profile.done: + if traffic_profile.done.is_set(): self._queue.put({'done': True}) LOG.debug("tg_prox done") self._terminated.value = 1 @@ -911,11 +1171,40 @@ class ProxResourceHelper(ClientResourceHelper): def collect_collectd_kpi(self): return self._collect_resource_kpi() + def collect_live_stats(self): + ports = [] + for _, port_num in self.vnfd_helper.ports_iter(): + ports.append(port_num) + + ok, curr_port_stats = self.sut.multi_port_stats(ports) + if not ok: + return False, {} + + hz = self.sut.hz() + if hz is 0: + hz = self.prev_hz + else: + self.prev_hz = hz + + new_all_port_stats = \ + self.sut.multi_port_stats_diff(self.prev_multi_port, curr_port_stats, hz) + + self.prev_multi_port = curr_port_stats + + live_stats = self.sut.multi_port_stats_tuple(new_all_port_stats, + self.vnfd_helper.ports_iter()) + return True, live_stats + def collect_kpi(self): result = super(ProxResourceHelper, self).collect_kpi() # add in collectd kpis manually if result: result['collect_stats'] = self._collect_resource_kpi() + + ok, live_stats = self.collect_live_stats() + if ok: + result.update({'live_stats': live_stats}) + return result def terminate(self): @@ -929,6 +1218,7 @@ class ProxResourceHelper(ClientResourceHelper): func = getattr(self.sut, cmd, None) if func: return func(*args, **kwargs) + return None def _connect(self, client=None): """Run and connect to prox on the remote system """ @@ -967,12 +1257,13 @@ class ProxResourceHelper(ClientResourceHelper): class ProxDataHelper(object): - def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss): + def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss, line_speed): super(ProxDataHelper, self).__init__() self.vnfd_helper = vnfd_helper self.sut = sut self.pkt_size = pkt_size self.value = value + self.line_speed = line_speed self.tolerated_loss = tolerated_loss self.port_count = len(self.vnfd_helper.port_pairs.all_ports) self.tsc_hz = None @@ -984,32 +1275,71 @@ class ProxDataHelper(object): @property def totals_and_pps(self): if self._totals_and_pps is None: - rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8] - pps = self.value / 100.0 * self.line_rate_to_pps() - self._totals_and_pps = rx_total, tx_total, pps + rx_total = tx_total = 0 + ok = False + timeout = time.time() + constants.RETRY_TIMEOUT + while not ok: + ok, all_ports = self.sut.multi_port_stats([ + self.vnfd_helper.port_num(port_name) + for port_name in self.vnfd_helper.port_pairs.all_ports]) + if time.time() > timeout: + break + if ok: + for port in all_ports: + rx_total = rx_total + port[1] + tx_total = tx_total + port[2] + requested_pps = self.value / 100.0 * self.line_rate_to_pps() + self._totals_and_pps = rx_total, tx_total, requested_pps return self._totals_and_pps @property def rx_total(self): - return self.totals_and_pps[0] + try: + ret_val = self.totals_and_pps[0] + except (AttributeError, ValueError, TypeError, LookupError): + ret_val = 0 + return ret_val @property def tx_total(self): - return self.totals_and_pps[1] + try: + ret_val = self.totals_and_pps[1] + except (AttributeError, ValueError, TypeError, LookupError): + ret_val = 0 + return ret_val @property - def pps(self): - return self.totals_and_pps[2] + def requested_pps(self): + try: + ret_val = self.totals_and_pps[2] + except (AttributeError, ValueError, TypeError, LookupError): + ret_val = 0 + return ret_val @property def samples(self): samples = {} + ports = [] + port_names = {} for port_name, port_num in self.vnfd_helper.ports_iter(): - port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8] - samples[port_name] = { - "in_packets": port_rx_total, - "out_packets": port_tx_total, - } + ports.append(port_num) + port_names[port_num] = port_name + + ok = False + timeout = time.time() + constants.RETRY_TIMEOUT + while not ok: + ok, results = self.sut.multi_port_stats(ports) + if time.time() > timeout: + break + if ok: + for result in results: + port_num = result[0] + try: + samples[port_names[port_num]] = { + "in_packets": result[1], + "out_packets": result[2]} + except (IndexError, KeyError): + pass return samples def __enter__(self): @@ -1032,7 +1362,7 @@ class ProxDataHelper(object): self.latency, self.rx_total, self.tx_total, - self.pps, + self.requested_pps, ) self.result_tuple.log_data() @@ -1051,9 +1381,7 @@ class ProxDataHelper(object): self.tsc_hz = float(self.sut.hz()) def line_rate_to_pps(self): - # NOTE: to fix, don't hardcode 10Gb/s - return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20) - + return self.port_count * self.line_speed / BITS_PER_BYTE / (self.pkt_size + 20) class ProxProfileHelper(object): @@ -1113,6 +1441,7 @@ class ProxProfileHelper(object): self.sut.set_pkt_size(self.test_cores, pkt_size) self.sut.set_speed(self.test_cores, value) self.sut.start_all() + time.sleep(1) yield finally: self.sut.stop_all() @@ -1127,15 +1456,37 @@ class ProxProfileHelper(object): for key, value in section: if key == "mode" and value == mode: core_tuple = CoreSocketTuple(section_name) - core = core_tuple.find_in_topology(self.cpu_topology) + core = core_tuple.core_id cores.append(core) return cores - def run_test(self, pkt_size, duration, value, tolerated_loss=0.0): - data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss) + def pct_10gbps(self, percent, line_speed): + """Get rate in percent of 10 Gbps. + + Returns the rate in percent of 10 Gbps. + For instance 100.0 = 10 Gbps; 400.0 = 40 Gbps. + + This helper method isrequired when setting interface_speed option in + the testcase because NSB/PROX considers 10Gbps as 100% of line rate, + this means that the line rate must be expressed as a percentage of + 10Gbps. + + :param percent: (float) Percent of line rate (100.0 = line rate). + :param line_speed: (int) line rate speed, in bits per second. + + :return: (float) Represents the rate in percent of 10Gbps. + """ + return (percent * line_speed / ( + constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)) - with data_helper, self.traffic_context(pkt_size, value): + def run_test(self, pkt_size, duration, value, tolerated_loss=0.0, + line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)): + data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, + value, tolerated_loss, line_speed) + + with data_helper, self.traffic_context(pkt_size, + self.pct_10gbps(value, line_speed)): with data_helper.measure_tot_stats(): time.sleep(duration) # Getting statistics to calculate PPS at right speed.... @@ -1149,6 +1500,10 @@ class ProxProfileHelper(object): :return: return lat_min, lat_max, lat_avg :rtype: list """ + + if not self._latency_cores: + self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE) + if self._latency_cores: return self.sut.lat_stats(self._latency_cores) return [] @@ -1198,12 +1553,12 @@ class ProxMplsProfileHelper(ProxProfileHelper): if item_value.startswith("tag"): core_tuple = CoreSocketTuple(section_name) - core_tag = core_tuple.find_in_topology(self.cpu_topology) + core_tag = core_tuple.core_id cores_tagged.append(core_tag) elif item_value.startswith("udp"): core_tuple = CoreSocketTuple(section_name) - core_udp = core_tuple.find_in_topology(self.cpu_topology) + core_udp = core_tuple.core_id cores_plain.append(core_udp) return cores_tagged, cores_plain @@ -1219,6 +1574,7 @@ class ProxMplsProfileHelper(ProxProfileHelper): ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20) self.sut.set_speed(self.plain_cores, value * ratio) self.sut.start_all() + time.sleep(1) yield finally: self.sut.stop_all() @@ -1276,23 +1632,23 @@ class ProxBngProfileHelper(ProxProfileHelper): if item_value.startswith("cpe"): core_tuple = CoreSocketTuple(section_name) - cpe_core = core_tuple.find_in_topology(self.cpu_topology) + cpe_core = core_tuple.core_id cpe_cores.append(cpe_core) elif item_value.startswith("inet"): core_tuple = CoreSocketTuple(section_name) - inet_core = core_tuple.find_in_topology(self.cpu_topology) + inet_core = core_tuple.core_id inet_cores.append(inet_core) elif item_value.startswith("arp"): core_tuple = CoreSocketTuple(section_name) - arp_core = core_tuple.find_in_topology(self.cpu_topology) + arp_core = core_tuple.core_id arp_cores.append(arp_core) # We check the tasks/core separately if item_value.startswith("arp_task"): core_tuple = CoreSocketTuple(section_name) - arp_task_core = core_tuple.find_in_topology(self.cpu_topology) + arp_task_core = core_tuple.core_id arp_tasks_core.append(arp_task_core) return cpe_cores, inet_cores, arp_cores, arp_tasks_core @@ -1385,10 +1741,13 @@ class ProxBngProfileHelper(ProxProfileHelper): time.sleep(3) self.sut.stop(self.all_rx_cores) - def run_test(self, pkt_size, duration, value, tolerated_loss=0.0): - data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss) + def run_test(self, pkt_size, duration, value, tolerated_loss=0.0, + line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)): + data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, + value, tolerated_loss, line_speed) - with data_helper, self.traffic_context(pkt_size, value): + with data_helper, self.traffic_context(pkt_size, + self.pct_10gbps(value, line_speed)): with data_helper.measure_tot_stats(): time.sleep(duration) # Getting statistics to calculate PPS at right speed.... @@ -1455,12 +1814,12 @@ class ProxVpeProfileHelper(ProxProfileHelper): if item_value.startswith("cpe"): core_tuple = CoreSocketTuple(section_name) - core_tag = core_tuple.find_in_topology(self.cpu_topology) + core_tag = core_tuple.core_id cpe_cores.append(core_tag) elif item_value.startswith("inet"): core_tuple = CoreSocketTuple(section_name) - inet_core = core_tuple.find_in_topology(self.cpu_topology) + inet_core = core_tuple.core_id inet_cores.append(inet_core) return cpe_cores, inet_cores @@ -1572,10 +1931,13 @@ class ProxVpeProfileHelper(ProxProfileHelper): time.sleep(3) self.sut.stop(self.all_rx_cores) - def run_test(self, pkt_size, duration, value, tolerated_loss=0.0): - data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss) + def run_test(self, pkt_size, duration, value, tolerated_loss=0.0, + line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)): + data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, + value, tolerated_loss, line_speed) - with data_helper, self.traffic_context(pkt_size, value): + with data_helper, self.traffic_context(pkt_size, + self.pct_10gbps(value, line_speed)): with data_helper.measure_tot_stats(): time.sleep(duration) # Getting statistics to calculate PPS at right speed.... @@ -1639,7 +2001,7 @@ class ProxlwAFTRProfileHelper(ProxProfileHelper): continue core_tuple = CoreSocketTuple(section_name) - core_tag = core_tuple.find_in_topology(self.cpu_topology) + core_tag = core_tuple.core_id for item_value in (v for k, v in section if k == 'name'): if item_value.startswith('tun'): tun_cores.append(core_tag) @@ -1761,10 +2123,13 @@ class ProxlwAFTRProfileHelper(ProxProfileHelper): time.sleep(3) self.sut.stop(self.all_rx_cores) - def run_test(self, pkt_size, duration, value, tolerated_loss=0.0): - data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss) + def run_test(self, pkt_size, duration, value, tolerated_loss=0.0, + line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)): + data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, + value, tolerated_loss, line_speed) - with data_helper, self.traffic_context(pkt_size, value): + with data_helper, self.traffic_context(pkt_size, + self.pct_10gbps(value, line_speed)): with data_helper.measure_tot_stats(): time.sleep(duration) # Getting statistics to calculate PPS at right speed.... @@ -1772,3 +2137,15 @@ class ProxlwAFTRProfileHelper(ProxProfileHelper): data_helper.latency = self.get_latency() return data_helper.result_tuple, data_helper.samples + + +class ProxIrqProfileHelper(ProxProfileHelper): + + __prox_profile_type__ = "IRQ Query" + + def __init__(self, resource_helper): + super(ProxIrqProfileHelper, self).__init__(resource_helper) + self._cores_tuple = None + self._ports_tuple = None + self.step_delta = 5 + self.step_time = 0.5 diff --git a/yardstick/network_services/vnf_generic/vnf/prox_irq.py b/yardstick/network_services/vnf_generic/vnf/prox_irq.py new file mode 100644 index 000000000..614066e46 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/prox_irq.py @@ -0,0 +1,200 @@ +# Copyright (c) 2018-2019 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import errno +import logging +import copy +import time + +from yardstick.common.process import check_if_process_failed +from yardstick.network_services.utils import get_nsb_option +from yardstick.network_services.vnf_generic.vnf.prox_vnf import ProxApproxVnf +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen +from yardstick.benchmark.contexts.base import Context +from yardstick.network_services.vnf_generic.vnf.prox_helpers import CoreSocketTuple +LOG = logging.getLogger(__name__) + + +class ProxIrq(SampleVNFTrafficGen): + + def __init__(self, name, vnfd, setup_env_helper_type=None, + resource_helper_type=None): + vnfd_cpy = copy.deepcopy(vnfd) + super(ProxIrq, self).__init__(name, vnfd_cpy) + + self._vnf_wrapper = ProxApproxVnf( + name, vnfd, setup_env_helper_type, resource_helper_type) + self.bin_path = get_nsb_option('bin_path', '') + self.name = self._vnf_wrapper.name + self.ssh_helper = self._vnf_wrapper.ssh_helper + self.setup_helper = self._vnf_wrapper.setup_helper + self.resource_helper = self._vnf_wrapper.resource_helper + self.scenario_helper = self._vnf_wrapper.scenario_helper + self.irq_cores = None + + def terminate(self): + self._vnf_wrapper.terminate() + super(ProxIrq, self).terminate() + + def instantiate(self, scenario_cfg, context_cfg): + self._vnf_wrapper.instantiate(scenario_cfg, context_cfg) + self._tg_process = self._vnf_wrapper._vnf_process + + def wait_for_instantiate(self): + self._vnf_wrapper.wait_for_instantiate() + + def get_irq_cores(self): + cores = [] + mode = "irq" + + for section_name, section in self.setup_helper.prox_config_data: + if not section_name.startswith("core"): + continue + irq_mode = task_present = False + task_present_task = 0 + for key, value in section: + if key == "mode" and value == mode: + irq_mode = True + if key == "task": + task_present = True + task_present_task = int(value) + + if irq_mode: + if not task_present: + task_present_task = 0 + core_tuple = CoreSocketTuple(section_name) + core = core_tuple.core_id + cores.append((core, task_present_task)) + + return cores + +class ProxIrqVNF(ProxIrq, SampleVNFTrafficGen): + + APP_NAME = 'ProxIrqVNF' + + def __init__(self, name, vnfd, setup_env_helper_type=None, + resource_helper_type=None): + ProxIrq.__init__(self, name, vnfd, setup_env_helper_type, + resource_helper_type) + + self.start_test_time = None + self.end_test_time = None + + def vnf_execute(self, cmd, *args, **kwargs): + ignore_errors = kwargs.pop("_ignore_errors", False) + try: + return self.resource_helper.execute(cmd, *args, **kwargs) + except OSError as e: + if e.errno in {errno.EPIPE, errno.ESHUTDOWN, errno.ECONNRESET}: + if ignore_errors: + LOG.debug("ignoring vnf_execute exception %s for command %s", e, cmd) + else: + raise + else: + raise + + def collect_kpi(self): + # check if the tg processes have exited + physical_node = Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + + result = {"physical_node": physical_node} + for proc in (self._tg_process, self._traffic_process): + check_if_process_failed(proc) + + if self.resource_helper is None: + return result + + if self.irq_cores is None: + self.setup_helper.build_config_file() + self.irq_cores = self.get_irq_cores() + + data = self.vnf_execute('irq_core_stats', self.irq_cores) + new_data = copy.deepcopy(data) + + self.end_test_time = time.time() + self.vnf_execute('reset_stats') + + if self.start_test_time is None: + new_data = {} + else: + test_time = self.end_test_time - self.start_test_time + for index, item in data.items(): + for counter, value in item.items(): + if counter.startswith("bucket_")or \ + counter.startswith("overflow"): + if value is 0: + del new_data[index][counter] + else: + new_data[index][counter] = float(value) / test_time + + self.start_test_time = time.time() + + result["collect_stats"] = new_data + LOG.debug("%s collect KPIs %s", self.APP_NAME, result) + + return result + +class ProxIrqGen(ProxIrq, SampleVNFTrafficGen): + + APP_NAME = 'ProxIrqGen' + + def __init__(self, name, vnfd, setup_env_helper_type=None, + resource_helper_type=None): + ProxIrq.__init__(self, name, vnfd, setup_env_helper_type, + resource_helper_type) + self.start_test_time = None + self.end_test_time = None + + def collect_kpi(self): + # check if the tg processes have exited + physical_node = Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + + result = {"physical_node": physical_node} + for proc in (self._tg_process, self._traffic_process): + check_if_process_failed(proc) + + if self.resource_helper is None: + return result + + if self.irq_cores is None: + self.setup_helper.build_config_file() + self.irq_cores = self.get_irq_cores() + + data = self.resource_helper.sut.irq_core_stats(self.irq_cores) + new_data = copy.deepcopy(data) + + self.end_test_time = time.time() + self.resource_helper.sut.reset_stats() + + if self.start_test_time is None: + new_data = {} + else: + test_time = self.end_test_time - self.start_test_time + for index, item in data.items(): + for counter, value in item.items(): + if counter.startswith("bucket_") or \ + counter.startswith("overflow"): + if value is 0: + del new_data[index][counter] + else: + new_data[index][counter] = float(value) / test_time + + self.start_test_time = time.time() + + result["collect_stats"] = new_data + LOG.debug("%s collect KPIs %s", self.APP_NAME, result) + + return result diff --git a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py index b7d295eee..c9abc757e 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2017 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,12 +14,15 @@ import errno import logging - +import datetime +import time from yardstick.common.process import check_if_process_failed from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxDpdkVnfSetupEnvHelper from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxResourceHelper -from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, PROCESS_JOIN_TIMEOUT +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF +from yardstick.network_services import constants +from yardstick.benchmark.contexts import base as context_base LOG = logging.getLogger(__name__) @@ -39,6 +42,10 @@ class ProxApproxVnf(SampleVNF): if resource_helper_type is None: resource_helper_type = ProxResourceHelper + self.prev_packets_in = 0 + self.prev_packets_sent = 0 + self.prev_tsc = 0 + self.tsc_hz = 0 super(ProxApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) @@ -62,41 +69,90 @@ class ProxApproxVnf(SampleVNF): def collect_kpi(self): # we can't get KPIs if the VNF is down - check_if_process_failed(self._vnf_process) + check_if_process_failed(self._vnf_process, 0.01) + + physical_node = context_base.Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + + result = {"physical_node": physical_node} if self.resource_helper is None: - result = { + result.update({ "packets_in": 0, "packets_dropped": 0, "packets_fwd": 0, + "curr_packets_in": 0, + "curr_packets_fwd": 0, "collect_stats": {"core": {}}, - } + }) return result + if (self.tsc_hz == 0): + self.tsc_hz = float(self.resource_helper.sut.hz()) + LOG.debug("TSC = %f", self.tsc_hz) + if (self.tsc_hz == 0): + raise RuntimeError("Unable to retrieve TSC") + # use all_ports so we only use ports matched in topology port_count = len(self.vnfd_helper.port_pairs.all_ports) if port_count not in {1, 2, 4}: raise RuntimeError("Failed ..Invalid no of ports .. " "1, 2 or 4 ports only supported at this time") - port_stats = self.vnf_execute('port_stats', range(port_count)) - try: - rx_total = port_stats[6] - tx_total = port_stats[7] - except IndexError: - LOG.error("port_stats parse fail %s", port_stats) - # return empty dict so we don't mess up existing KPIs + tmpPorts = [self.vnfd_helper.port_num(port_name) + for port_name in self.vnfd_helper.port_pairs.all_ports] + ok = False + timeout = time.time() + constants.RETRY_TIMEOUT + while not ok: + ok, all_port_stats = self.vnf_execute('multi_port_stats', tmpPorts) + if time.time() > timeout: + break + + if ok: + rx_total = tx_total = tsc = 0 + try: + for single_port_stats in all_port_stats: + rx_total = rx_total + single_port_stats[1] + tx_total = tx_total + single_port_stats[2] + tsc = tsc + single_port_stats[5] + except (TypeError, IndexError): + LOG.error("Invalid data ...") + return {} + else: return {} - result = { + tsc = tsc / port_count + + result.update({ "packets_in": rx_total, "packets_dropped": max((tx_total - rx_total), 0), "packets_fwd": tx_total, # we share ProxResourceHelper with TG, but we want to collect # collectd KPIs here and not TG KPIs, so use a different method name "collect_stats": self.resource_helper.collect_collectd_kpi(), - } - LOG.debug("%s collect KPIs %s", self.APP_NAME, result) + }) + try: + curr_packets_in = int(((rx_total - self.prev_packets_in) * self.tsc_hz) + / (tsc - self.prev_tsc)) + except ZeroDivisionError: + LOG.error("Error.... Divide by Zero") + curr_packets_in = 0 + + try: + curr_packets_fwd = int(((tx_total - self.prev_packets_sent) * self.tsc_hz) + / (tsc - self.prev_tsc)) + except ZeroDivisionError: + LOG.error("Error.... Divide by Zero") + curr_packets_fwd = 0 + + result["curr_packets_in"] = curr_packets_in + result["curr_packets_fwd"] = curr_packets_fwd + + self.prev_packets_in = rx_total + self.prev_packets_sent = tx_total + self.prev_tsc = tsc + + LOG.debug("%s collect KPIs %s %s", self.APP_NAME, datetime.datetime.now(), result) return result def _tear_down(self): @@ -119,5 +175,5 @@ class ProxApproxVnf(SampleVNF): self._tear_down() if self._vnf_process is not None: LOG.debug("joining before terminate %s", self._vnf_process.name) - self._vnf_process.join(PROCESS_JOIN_TIMEOUT) + self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT) self._vnf_process.terminate() diff --git a/yardstick/network_services/vnf_generic/vnf/router_vnf.py b/yardstick/network_services/vnf_generic/vnf/router_vnf.py index aea27ffa6..f1486bdb4 100644 --- a/yardstick/network_services/vnf_generic/vnf/router_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/router_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -47,7 +47,6 @@ class RouterVNF(SampleVNF): def instantiate(self, scenario_cfg, context_cfg): self.scenario_helper.scenario_cfg = scenario_cfg self.context_cfg = context_cfg - self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name]) self.configure_routes(self.name, scenario_cfg, context_cfg) def wait_for_instantiate(self): @@ -107,8 +106,11 @@ class RouterVNF(SampleVNF): stdout = self.ssh_helper.execute(ip_link_stats)[1] link_stats = self.get_stats(stdout) # get RX/TX from link_stats and assign to results + physical_node = Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) result = { + "physical_node": physical_node, "packets_in": 0, "packets_dropped": 0, "packets_fwd": 0, diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 5eeb6c889..a369a3ae6 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,91 +11,43 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -""" Base class implementation for generic vnf implementation """ -from __future__ import absolute_import - -import posixpath -import time import logging +import decimal +from multiprocessing import Queue, Value, Process, JoinableQueue import os +import posixpath import re import subprocess -from collections import Mapping -from multiprocessing import Queue, Value, Process +import time -from six.moves import cStringIO +from trex_stl_lib.trex_stl_client import LoggerApi +from trex_stl_lib.trex_stl_client import STLClient +from trex_stl_lib.trex_stl_exceptions import STLError from yardstick.benchmark.contexts.base import Context -from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file from yardstick.common import exceptions as y_exceptions from yardstick.common.process import check_if_process_failed -from yardstick.network_services.helpers.samplevnf_helper import PortPairs +from yardstick.common import utils +from yardstick.common import yaml_loader +from yardstick.network_services import constants +from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig -from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper from yardstick.network_services.nfvi.resource import ResourceProfile +from yardstick.network_services.utils import get_nsb_option +from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen from yardstick.network_services.vnf_generic.vnf.base import GenericVNF from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper -from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen -from yardstick.network_services.utils import get_nsb_option - -from trex_stl_lib.trex_stl_client import STLClient -from trex_stl_lib.trex_stl_client import LoggerApi -from trex_stl_lib.trex_stl_exceptions import STLError - -from yardstick.ssh import AutoConnectSSH - -DPDK_VERSION = "dpdk-16.07" +from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper +from yardstick.benchmark.contexts.node import NodeContext LOG = logging.getLogger(__name__) -REMOTE_TMP = "/tmp" -DEFAULT_VNF_TIMEOUT = 3600 -PROCESS_JOIN_TIMEOUT = 3 - - -class VnfSshHelper(AutoConnectSSH): - - def __init__(self, node, bin_path, wait=None): - self.node = node - kwargs = self.args_from_node(self.node) - if wait: - kwargs.setdefault('wait', wait) - - super(VnfSshHelper, self).__init__(**kwargs) - self.bin_path = bin_path - - @staticmethod - def get_class(): - # must return static class name, anything else refers to the calling class - # i.e. the subclass, not the superclass - return VnfSshHelper - - def copy(self): - # this copy constructor is different from SSH classes, since it uses node - return self.get_class()(self.node, self.bin_path) - - def upload_config_file(self, prefix, content): - cfg_file = os.path.join(REMOTE_TMP, prefix) - LOG.debug(content) - file_obj = cStringIO(content) - self.put_file_obj(file_obj, cfg_file) - return cfg_file - - def join_bin_path(self, *args): - return os.path.join(self.bin_path, *args) - - def provision_tool(self, tool_path=None, tool_file=None): - if tool_path is None: - tool_path = self.bin_path - return super(VnfSshHelper, self).provision_tool(tool_path, tool_file) - - class SetupEnvHelper(object): - CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config") - CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script") + CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config") + CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script") DEFAULT_CONFIG_TPL_CFG = "sample.cfg" PIPELINE_COMMAND = '' VNF_TYPE = "SAMPLE" @@ -105,6 +57,7 @@ class SetupEnvHelper(object): self.vnfd_helper = vnfd_helper self.ssh_helper = ssh_helper self.scenario_helper = scenario_helper + self.collectd_options = {} def build_config(self): raise NotImplementedError @@ -123,6 +76,7 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): APP_NAME = 'DpdkVnf' FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'" + NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages' @staticmethod def _update_packet_type(ip_pipeline_cfg, traffic_options): @@ -158,25 +112,11 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): self.used_drivers = None self.dpdk_bind_helper = DpdkBindHelper(ssh_helper) - def _setup_hugepages(self): - cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo" - hugepages = self.ssh_helper.execute(cmd)[1].rstrip() - - memory_path = \ - '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages - self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path) - - if hugepages == "2048kB": - pages = 8192 - else: - pages = 16 - - self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path)) - def build_config(self): vnf_cfg = self.scenario_helper.vnf_cfg task_path = self.scenario_helper.task_path + config_file = vnf_cfg.get('file') lb_count = vnf_cfg.get('lb_count', 3) lb_config = vnf_cfg.get('lb_config', 'SW') worker_config = vnf_cfg.get('worker_config', '1C/1T') @@ -189,7 +129,15 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): 'vnf_type': self.VNF_TYPE, } - config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path) + # read actions/rules from file + acl_options = None + acl_file_name = self.scenario_helper.options.get('rules') + if acl_file_name: + with utils.open_relative_file(acl_file_name, task_path) as infile: + acl_options = yaml_loader.yaml_load(infile) + + config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, + task_path) config_basename = posixpath.basename(self.CFG_CONFIG) script_basename = posixpath.basename(self.CFG_SCRIPT) multiport = MultiPortConfig(self.scenario_helper.topology, @@ -204,21 +152,34 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): self.socket) multiport.generate_config() - with open(self.CFG_CONFIG) as handle: - new_config = handle.read() - - new_config = self._update_traffic_type(new_config, traffic_options) - new_config = self._update_packet_type(new_config, traffic_options) - + if config_file: + with utils.open_relative_file(config_file, task_path) as infile: + new_config = ['[EAL]'] + vpci = [] + for port in self.vnfd_helper.port_pairs.all_ports: + interface = self.vnfd_helper.find_interface(name=port) + vpci.append(interface['virtual-interface']["vpci"]) + new_config.extend('w = {0}'.format(item) for item in vpci) + new_config = '\n'.join(new_config) + '\n' + infile.read() + else: + with open(self.CFG_CONFIG) as handle: + new_config = handle.read() + new_config = self._update_traffic_type(new_config, traffic_options) + new_config = self._update_packet_type(new_config, traffic_options) self.ssh_helper.upload_config_file(config_basename, new_config) self.ssh_helper.upload_config_file(script_basename, - multiport.generate_script(self.vnfd_helper)) + multiport.generate_script(self.vnfd_helper, + self.get_flows_config(acl_options))) LOG.info("Provision and start the %s", self.APP_NAME) self._build_pipeline_kwargs() return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs) - def _build_pipeline_kwargs(self): + def get_flows_config(self, options=None): # pylint: disable=unused-argument + """No actions/rules (flows) by default""" + return None + + def _build_pipeline_kwargs(self, cfg_file=None, script=None): tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME) # count the number of actual ports in the list of pairs # remove duplicate ports @@ -229,16 +190,24 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): port_nums = self.vnfd_helper.port_nums(ports) # create mask from all the dpdk port numbers ports_mask_hex = hex(sum(2 ** num for num in port_nums)) + + vnf_cfg = self.scenario_helper.vnf_cfg + lb_config = vnf_cfg.get('lb_config', 'SW') + worker_threads = vnf_cfg.get('worker_threads', 3) + hwlb = '' + if lb_config == 'HW': + hwlb = ' --hwlb %s' % worker_threads + self.pipeline_kwargs = { - 'cfg_file': self.CFG_CONFIG, - 'script': self.CFG_SCRIPT, + 'cfg_file': cfg_file if cfg_file else self.CFG_CONFIG, + 'script': script if script else self.CFG_SCRIPT, 'port_mask_hex': ports_mask_hex, 'tool_path': tool_path, + 'hwlb': hwlb, } def setup_vnf_environment(self): self._setup_dpdk() - self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces] self.kill_vnf() # bind before _setup_resources so we can use dpdk_port_num self._detect_and_bind_drivers() @@ -254,26 +223,17 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): self.ssh_helper.execute("sudo killall %s" % self.APP_NAME) def _setup_dpdk(self): - """ setup dpdk environment needed for vnf to run """ - - self._setup_hugepages() - self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio") + """Setup DPDK environment needed for VNF to run""" + hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16) + utils.setup_hugepages(self.ssh_helper, hugepages_gb * 1024 * 1024) + self.dpdk_bind_helper.load_dpdk_driver() - exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0] + exit_status = self.dpdk_bind_helper.check_dpdk_driver() if exit_status == 0: return - - dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION) - dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh") - exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0] - if exit_status != 0: - self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup) - - def get_collectd_options(self): - options = self.scenario_helper.all_options.get("collectd", {}) - # override with specific node settings - options.update(self.scenario_helper.options.get("collectd", {})) - return options + else: + LOG.critical("DPDK Driver not installed") + return def _setup_resources(self): # what is this magic? how do we know which socket is for which port? @@ -287,16 +247,29 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): # this won't work because we don't have DPDK port numbers yet ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num) port_names = (intf["name"] for intf in ports) - collectd_options = self.get_collectd_options() - plugins = collectd_options.get("plugins", {}) + plugins = self.collectd_options.get("plugins", {}) + interval = self.collectd_options.get("interval") # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names, - plugins=plugins, interval=collectd_options.get("interval"), + plugins=plugins, interval=interval, timeout=self.scenario_helper.timeout) + def _check_interface_fields(self): + num_nodes = len(self.scenario_helper.nodes) + # OpenStack instance creation time is probably proportional to the number + # of instances + timeout = 120 * num_nodes + dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces, + self.ssh_helper, timeout) + dpdk_node.check() + def _detect_and_bind_drivers(self): interfaces = self.vnfd_helper.interfaces + self._check_interface_fields() + # check for bound after probe + self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces] + self.dpdk_bind_helper.read_status() self.dpdk_bind_helper.save_used_drivers() @@ -337,6 +310,7 @@ class ResourceHelper(object): self.resource = None self.setup_helper = setup_helper self.ssh_helper = setup_helper.ssh_helper + self._enable = True def setup(self): self.resource = self.setup_helper.setup_vnf_environment() @@ -344,22 +318,33 @@ class ResourceHelper(object): def generate_cfg(self): pass + def update_from_context(self, context, attr_name): + """Disable resource helper in case of baremetal context. + + And update appropriate node collectd options in context + """ + if isinstance(context, NodeContext): + self._enable = False + context.update_collectd_options_for_node(self.setup_helper.collectd_options, + attr_name) + def _collect_resource_kpi(self): result = {} - status = self.resource.check_if_sa_running("collectd")[0] - if status == 0: + status = self.resource.check_if_system_agent_running("collectd")[0] + if status == 0 and self._enable: result = self.resource.amqp_collect_nfvi_kpi() result = {"core": result} return result def start_collect(self): - self.resource.initiate_systemagent(self.ssh_helper.bin_path) - self.resource.start() - self.resource.amqp_process_for_nfvi_kpi() + if self._enable: + self.resource.initiate_systemagent(self.ssh_helper.bin_path) + self.resource.start() + self.resource.amqp_process_for_nfvi_kpi() def stop_collect(self): - if self.resource: + if self.resource and self._enable: self.resource.stop() def collect_kpi(self): @@ -400,42 +385,17 @@ class ClientResourceHelper(ResourceHelper): try: return self.client.get_stats(*args, **kwargs) except STLError: - LOG.exception("TRex client not connected") - return {} - - def generate_samples(self, ports, key=None, default=None): - # needs to be used ports - last_result = self.get_stats(ports) - key_value = last_result.get(key, default) - - if not isinstance(last_result, Mapping): # added for mock unit test - self._terminated.value = 1 + LOG.error('TRex client not connected') return {} - samples = {} - # recalculate port for interface and see if it matches ports provided - for intf in self.vnfd_helper.interfaces: - name = intf["name"] - port = self.vnfd_helper.port_num(name) - if port in ports: - xe_value = last_result.get(port, {}) - samples[name] = { - "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)), - "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)), - "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)), - "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)), - "in_packets": int(xe_value.get("ipackets", 0)), - "out_packets": int(xe_value.get("opackets", 0)), - } - if key: - samples[name][key] = key_value - return samples + def _get_samples(self, ports, port_pg_id=False): + raise NotImplementedError() def _run_traffic_once(self, traffic_profile): traffic_profile.execute_traffic(self) self.client_started.value = 1 time.sleep(self.RUN_DURATION) - samples = self.generate_samples(traffic_profile.ports) + samples = self._get_samples(traffic_profile.ports) time.sleep(self.QUEUE_WAIT_TIME) self._queue.put(samples) @@ -448,12 +408,17 @@ class ClientResourceHelper(ResourceHelper): try: self._build_ports() self.client = self._connect() + if self.client is None: + LOG.critical("Failure to Connect ... unable to continue") + return + self.client.reset(ports=self.all_ports) self.client.remove_all_streams(self.all_ports) # remove all streams traffic_profile.register_generator(self) while self._terminated.value == 0: - self._run_traffic_once(traffic_profile) + if self._run_traffic_once(traffic_profile): + self._terminated.value = 1 self.client.stop(self.all_ports) self.client.disconnect() @@ -496,22 +461,35 @@ class ClientResourceHelper(ResourceHelper): server=self.vnfd_helper.mgmt_interface["ip"], verbose_level=LoggerApi.VERBOSE_QUIET) - # try to connect with 5s intervals, 30s max + # try to connect with 5s intervals for idx in range(6): try: client.connect() - break + for idx2 in range(6): + if client.is_connected(): + return client + LOG.info("Waiting to confirm connection %s .. Attempt %s", + idx, idx2) + time.sleep(1) + client.disconnect(stop_traffic=True, release_ports=True) except STLError: LOG.info("Unable to connect to Trex Server.. Attempt %s", idx) time.sleep(5) - return client + if client.is_connected(): + return client + else: + LOG.critical("Connection failure ..TRex username: %s server: %s", + self.vnfd_helper.mgmt_interface["user"], + self.vnfd_helper.mgmt_interface["ip"]) + return None class Rfc2544ResourceHelper(object): DEFAULT_CORRELATED_TRAFFIC = False DEFAULT_LATENCY = False DEFAULT_TOLERANCE = '0.0001 - 0.0001' + DEFAULT_RESOLUTION = '0.1' def __init__(self, scenario_helper): super(Rfc2544ResourceHelper, self).__init__() @@ -522,6 +500,8 @@ class Rfc2544ResourceHelper(object): self._rfc2544 = None self._tolerance_low = None self._tolerance_high = None + self._tolerance_precision = None + self._resolution = None @property def rfc2544(self): @@ -542,6 +522,12 @@ class Rfc2544ResourceHelper(object): return self._tolerance_high @property + def tolerance_precision(self): + if self._tolerance_precision is None: + self.get_rfc_tolerance() + return self._tolerance_precision + + @property def correlated_traffic(self): if self._correlated_traffic is None: self._correlated_traffic = \ @@ -555,14 +541,25 @@ class Rfc2544ResourceHelper(object): self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY) return self._latency + @property + def resolution(self): + if self._resolution is None: + self._resolution = float(self.get_rfc2544('resolution', + self.DEFAULT_RESOLUTION)) + return self._resolution + def get_rfc2544(self, name, default=None): return self.rfc2544.get(name, default) def get_rfc_tolerance(self): tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE) - tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-'))) - self._tolerance_low = next(tolerance_iter) - self._tolerance_high = next(tolerance_iter, self.tolerance_low) + tolerance_iter = iter(sorted( + decimal.Decimal(t.strip()) for t in tolerance_str.split('-'))) + tolerance_low = next(tolerance_iter) + tolerance_high = next(tolerance_iter, tolerance_low) + self._tolerance_precision = abs(tolerance_high.as_tuple().exponent) + self._tolerance_high = float(tolerance_high) + self._tolerance_low = float(tolerance_low) class SampleVNFDeployHelper(object): @@ -638,8 +635,10 @@ class ScenarioHelper(object): @property def timeout(self): - return self.options.get('timeout', DEFAULT_VNF_TIMEOUT) - + test_duration = self.scenario_cfg.get('runner', {}).get('duration', + self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)) + test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT) + return test_duration if test_duration > test_timeout else test_timeout class SampleVNF(GenericVNF): """ Class providing file-like API for generic VNF implementation """ @@ -672,7 +671,6 @@ class SampleVNF(GenericVNF): self.resource_helper = resource_helper_type(self.setup_helper) self.context_cfg = None - self.nfvi_context = None self.pipeline_kwargs = {} self.uplink_ports = None self.downlink_ports = None @@ -686,49 +684,6 @@ class SampleVNF(GenericVNF): self.vnf_port_pairs = None self._vnf_process = None - def _build_ports(self): - self._port_pairs = PortPairs(self.vnfd_helper.interfaces) - self.networks = self._port_pairs.networks - self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports) - self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports) - self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports) - - def _get_route_data(self, route_index, route_type): - route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', [])) - for _ in range(route_index): - next(route_iter, '') - return next(route_iter, {}).get(route_type, '') - - def _get_port0localip6(self): - return_value = self._get_route_data(0, 'network') - LOG.info("_get_port0localip6 : %s", return_value) - return return_value - - def _get_port1localip6(self): - return_value = self._get_route_data(1, 'network') - LOG.info("_get_port1localip6 : %s", return_value) - return return_value - - def _get_port0prefixlen6(self): - return_value = self._get_route_data(0, 'netmask') - LOG.info("_get_port0prefixlen6 : %s", return_value) - return return_value - - def _get_port1prefixlen6(self): - return_value = self._get_route_data(1, 'netmask') - LOG.info("_get_port1prefixlen6 : %s", return_value) - return return_value - - def _get_port0gateway6(self): - return_value = self._get_route_data(0, 'network') - LOG.info("_get_port0gateway6 : %s", return_value) - return return_value - - def _get_port1gateway6(self): - return_value = self._get_route_data(1, 'network') - LOG.info("_get_port1gateway6 : %s", return_value) - return return_value - def _start_vnf(self): self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT) name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid()) @@ -739,10 +694,13 @@ class SampleVNF(GenericVNF): pass def instantiate(self, scenario_cfg, context_cfg): + self._update_collectd_options(scenario_cfg, context_cfg) self.scenario_helper.scenario_cfg = scenario_cfg self.context_cfg = context_cfg - self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name]) - # self.nfvi_context = None + self.resource_helper.update_from_context( + Context.get_context_from_server(self.scenario_helper.nodes[self.name]), + self.scenario_helper.nodes[self.name] + ) # vnf deploy is unsupported, use ansible playbooks if self.scenario_helper.options.get("vnf_deploy", False): @@ -750,6 +708,54 @@ class SampleVNF(GenericVNF): self.resource_helper.setup() self._start_vnf() + def _update_collectd_options(self, scenario_cfg, context_cfg): + """Update collectd configuration options + This function retrieves all collectd options contained in the test case + + definition builds a single dictionary combining them. The following fragment + represents a test case with the collectd options and priorities (1 highest, 3 lowest): + --- + schema: yardstick:task:0.1 + scenarios: + - type: NSPerf + nodes: + tg__0: trafficgen_0.yardstick + vnf__0: vnf_0.yardstick + options: + collectd: + <options> # COLLECTD priority 3 + vnf__0: + collectd: + plugins: + load + <options> # COLLECTD priority 2 + context: + type: Node + name: yardstick + nfvi_type: baremetal + file: /etc/yardstick/nodes/pod_ixia.yaml # COLLECTD priority 1 + """ + scenario_options = scenario_cfg.get('options', {}) + generic_options = scenario_options.get('collectd', {}) + scenario_node_options = scenario_options.get(self.name, {})\ + .get('collectd', {}) + context_node_options = context_cfg.get('nodes', {})\ + .get(self.name, {}).get('collectd', {}) + + options = generic_options + self._update_options(options, scenario_node_options) + self._update_options(options, context_node_options) + + self.setup_helper.collectd_options = options + + def _update_options(self, options, additional_options): + """Update collectd options and plugins dictionary""" + for k, v in additional_options.items(): + if isinstance(v, dict) and k in options: + options[k].update(v) + else: + options[k] = v + def wait_for_instantiate(self): buf = [] time.sleep(self.WAIT_TIME) # Give some time for config to load @@ -765,7 +771,6 @@ class SampleVNF(GenericVNF): LOG.info("%s VNF is up and running.", self.APP_NAME) self._vnf_up_post() self.queue_wrapper.clear() - self.resource_helper.start_collect() return self._vnf_process.exitcode if "PANIC" in message: @@ -778,6 +783,59 @@ class SampleVNF(GenericVNF): # by other VNF output self.q_in.put('\r\n') + def wait_for_initialize(self): + buf = [] + vnf_prompt_found = False + prompt_command = '\r\n' + script_name = 'non_existent_script_name' + done_string = 'Cannot open file "{}"'.format(script_name) + time.sleep(self.WAIT_TIME) # Give some time for config to load + while True: + if not self._vnf_process.is_alive(): + raise RuntimeError("%s VNF process died." % self.APP_NAME) + while self.q_out.qsize() > 0: + buf.append(self.q_out.get()) + message = ''.join(buf) + + if self.VNF_PROMPT in message and not vnf_prompt_found: + # Once we got VNF promt, it doesn't mean that the VNF is + # up and running/initialized completely. But we can run + # addition (any) VNF command and wait for it to complete + # as it will be finished ONLY at the end of the VNF + # initialization. So, this approach can be used to + # indentify that VNF is completely initialized. + LOG.info("Got %s VNF prompt.", self.APP_NAME) + prompt_command = "run {}\r\n".format(script_name) + self.q_in.put(prompt_command) + # Cut the buffer since we are not interesting to find + # the VNF prompt anymore + prompt_pos = message.find(self.VNF_PROMPT) + buf = [message[prompt_pos + len(self.VNF_PROMPT):]] + vnf_prompt_found = True + continue + + if done_string in message: + LOG.info("%s VNF is up and running.", self.APP_NAME) + self._vnf_up_post() + self.queue_wrapper.clear() + return self._vnf_process.exitcode + + if "PANIC" in message: + raise RuntimeError("Error starting %s VNF." % + self.APP_NAME) + + LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME) + time.sleep(self.WAIT_TIME_FOR_SCRIPT) + # Send command again to display the expected prompt in case the + # expected text was corrupted by other VNF output + self.q_in.put(prompt_command) + + def start_collect(self): + self.resource_helper.start_collect() + + def stop_collect(self): + self.resource_helper.stop_collect() + def _build_run_kwargs(self): self.run_kwargs = { 'stdin': self.queue_wrapper, @@ -823,7 +881,7 @@ class SampleVNF(GenericVNF): if self._vnf_process is not None: # be proper and join first before we kill LOG.debug("joining before terminate %s", self._vnf_process.name) - self._vnf_process.join(PROCESS_JOIN_TIMEOUT) + self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT) self._vnf_process.terminate() # no terminate children here because we share processes with tg @@ -840,18 +898,21 @@ class SampleVNF(GenericVNF): def collect_kpi(self): # we can't get KPIs if the VNF is down - check_if_process_failed(self._vnf_process) + check_if_process_failed(self._vnf_process, 0.01) stats = self.get_stats() m = re.search(self.COLLECT_KPI, stats, re.MULTILINE) + physical_node = Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + + result = {"physical_node": physical_node} if m: - result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()} + result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}) result["collect_stats"] = self.resource_helper.collect_kpi() else: - result = { - "packets_in": 0, - "packets_fwd": 0, - "packets_dropped": 0, - } + result.update({"packets_in": 0, + "packets_fwd": 0, + "packets_dropped": 0}) + LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result @@ -890,6 +951,39 @@ class SampleVNFTrafficGen(GenericTrafficGen): self.traffic_finished = False self._tg_process = None self._traffic_process = None + self._tasks_queue = JoinableQueue() + self._result_queue = Queue() + + def _test_runner(self, traffic_profile, tasks, results): + self.resource_helper.run_test(traffic_profile, tasks, results) + + def _init_traffic_process(self, traffic_profile): + name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME, + traffic_profile.__class__.__name__, + os.getpid()) + self._traffic_process = Process(name=name, target=self._test_runner, + args=( + traffic_profile, self._tasks_queue, + self._result_queue)) + + self._traffic_process.start() + while self.resource_helper.client_started.value == 0: + time.sleep(1) + if not self._traffic_process.is_alive(): + break + + def run_traffic_once(self, traffic_profile): + if self.resource_helper.client_started.value == 0: + self._init_traffic_process(traffic_profile) + + # continue test - run next iteration + LOG.info("Run next iteration ...") + self._tasks_queue.put('RUN_TRAFFIC') + + def wait_on_traffic(self): + self._tasks_queue.join() + result = self._result_queue.get() + return result def _start_server(self): # we can't share ssh paramiko objects to force new connection @@ -897,6 +991,13 @@ class SampleVNFTrafficGen(GenericTrafficGen): def instantiate(self, scenario_cfg, context_cfg): self.scenario_helper.scenario_cfg = scenario_cfg + self.resource_helper.update_from_context( + Context.get_context_from_server(self.scenario_helper.nodes[self.name]), + self.scenario_helper.nodes[self.name] + ) + + self.resource_helper.context_cfg = context_cfg + self.resource_helper.setup() # must generate_cfg after DPDK bind because we need port number self.resource_helper.generate_cfg() @@ -951,9 +1052,14 @@ class SampleVNFTrafficGen(GenericTrafficGen): def collect_kpi(self): # check if the tg processes have exited + physical_node = Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + + result = {"physical_node": physical_node} for proc in (self._tg_process, self._traffic_process): check_if_process_failed(proc) - result = self.resource_helper.collect_kpi() + + result["collect_stats"] = self.resource_helper.collect_kpi() LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result @@ -967,12 +1073,12 @@ class SampleVNFTrafficGen(GenericTrafficGen): if self._traffic_process is not None: # be proper and try to join before terminating LOG.debug("joining before terminate %s", self._traffic_process.name) - self._traffic_process.join(PROCESS_JOIN_TIMEOUT) + self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT) self._traffic_process.terminate() if self._tg_process is not None: # be proper and try to join before terminating LOG.debug("joining before terminate %s", self._tg_process.name) - self._tg_process.join(PROCESS_JOIN_TIMEOUT) + self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT) self._tg_process.terminate() # no terminate children here because we share processes with vnf diff --git a/yardstick/network_services/vnf_generic/vnf/tg_imsbench_sipp.py b/yardstick/network_services/vnf_generic/vnf/tg_imsbench_sipp.py new file mode 100644 index 000000000..70557b848 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/tg_imsbench_sipp.py @@ -0,0 +1,143 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from collections import deque + +from yardstick.network_services.vnf_generic.vnf import sample_vnf + +LOG = logging.getLogger(__name__) + + +class SippSetupEnvHelper(sample_vnf.SetupEnvHelper): + APP_NAME = "ImsbenchSipp" + + +class SippResourceHelper(sample_vnf.ClientResourceHelper): + pass + + +class SippVnf(sample_vnf.SampleVNFTrafficGen): + """ + This class calls the test script from TG machine, then gets the result file + from IMS machine. After that, the result file is handled line by line, and + is updated to database. + """ + + APP_NAME = "ImsbenchSipp" + APP_WORD = "ImsbenchSipp" + VNF_TYPE = "ImsbenchSipp" + HW_OFFLOADING_NFVI_TYPES = {'baremetal', 'sriov'} + RESULT = "/tmp/final_result.dat" + SIPP_RESULT = "/tmp/sipp_dat_files/final_result.dat" + LOCAL_PATH = "/tmp" + CMD = "./SIPp_benchmark.bash {} {} {} '{}'" + + def __init__(self, name, vnfd, setup_env_helper_type=None, + resource_helper_type=None): + if resource_helper_type is None: + resource_helper_type = SippResourceHelper + if setup_env_helper_type is None: + setup_env_helper_type = SippSetupEnvHelper + super(SippVnf, self).__init__( + name, vnfd, setup_env_helper_type, resource_helper_type) + self.params = "" + self.pcscf_ip = self.vnfd_helper.interfaces[0]["virtual-interface"]\ + ["peer_intf"]["local_ip"] + self.sipp_ip = self.vnfd_helper.interfaces[0]["virtual-interface"]\ + ["local_ip"] + self.media_ip = self.vnfd_helper.interfaces[1]["virtual-interface"]\ + ["local_ip"] + self.queue = "" + self.count = 0 + + def instantiate(self, scenario_cfg, context_cfg): + super(SippVnf, self).instantiate(scenario_cfg, context_cfg) + scenario_cfg = {} + _params = [("port", 5060), ("start_user", 1), ("end_user", 10000), + ("init_reg_cps", 50), ("init_reg_max", 5000), ("reg_cps", 50), + ("reg_step", 10), ("rereg_cps", 10), ("rereg_step", 5), + ("dereg_cps", 10), ("dereg_step", 5), ("msgc_cps", 10), + ("msgc_step", 2), ("run_mode", "rtp"), ("call_cps", 10), + ("hold_time", 15), ("call_step", 5)] + + self.params = ';'.join([str(scenario_cfg.get("options", {}).get(k, v)) + for k, v in dict(_params).items()]) + + def wait_for_instantiate(self): + pass + + def get_result_files(self): + self.ssh_helper.get(self.SIPP_RESULT, self.LOCAL_PATH, True) + + # Example of result file: + # cat /tmp/final_result.dat + # timestamp:1000 reg:100 reg_saps:0 + # timestamp:2000 reg:100 reg_saps:50 + # timestamp:3000 reg:100 reg_saps:50 + # timestamp:4000 reg:100 reg_saps:50 + # ... + # reg_Requested_prereg:50 + # reg_Effective_prereg:49.49 + # reg_DOC:0 + # ... + @staticmethod + def handle_result_files(filename): + with open(filename, 'r') as f: + content = f.readlines() + result = [{k: round(float(v), 2) for k, v in [i.split(":", 1) for i in x.split()]} + for x in content if x] + return deque(result) + + def run_traffic(self, traffic_profile): + traffic_profile.execute_traffic(self) + cmd = self.CMD.format(self.sipp_ip, self.media_ip, + self.pcscf_ip, self.params) + self.ssh_helper.execute(cmd, None, 3600, False) + self.get_result_files() + self.queue = self.handle_result_files(self.RESULT) + + def collect_kpi(self): + result = {} + try: + result = self.queue.popleft() + except IndexError: + pass + return result + + @staticmethod + def count_line_num(fname): + try: + with open(fname, 'r') as f: + return sum(1 for line in f) + except IOError: + return 0 + + def is_ended(self): + """ + The test will end when the results are pushed into database. + It does not depend on the "duration" value, so this value will be set + enough big to make sure that the test will end before duration. + """ + num_lines = self.count_line_num(self.RESULT) + if self.count == num_lines: + LOG.debug('TG IS ENDED.....................') + self.count = 0 + return True + self.count += 1 + return False + + def terminate(self): + LOG.debug('TERMINATE:.....................') + self.resource_helper.terminate() diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ixload.py b/yardstick/network_services/vnf_generic/vnf/tg_ixload.py index 61c045405..38b00a4b2 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_ixload.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_ixload.py @@ -12,20 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import absolute_import +import collections import csv import glob import logging import os import shutil +import subprocess + +from oslo_serialization import jsonutils + +from yardstick.common import utils +from yardstick.network_services.vnf_generic.vnf import sample_vnf -from collections import OrderedDict -from subprocess import call -from yardstick.common.utils import makedirs -from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen -from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper -from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file LOG = logging.getLogger(__name__) @@ -45,7 +45,8 @@ IXLOAD_CONFIG_TEMPLATE = '''\ }, "remote_server": "%s", "result_dir": "%s", - "ixload_cfg": "C:/Results/%s" + "ixload_cfg": "C:/Results/%s", + "links_param": %s }''' IXLOAD_CMD = "{ixloadpy} {http_ixload} {args}" @@ -61,11 +62,11 @@ class ResourceDataHelper(list): } -class IxLoadResourceHelper(ClientResourceHelper): +class IxLoadResourceHelper(sample_vnf.ClientResourceHelper): RESULTS_MOUNT = "/mnt/Results" - KPI_LIST = OrderedDict(( + KPI_LIST = collections.OrderedDict(( ('http_throughput', 'HTTP Total Throughput (Kbps)'), ('simulated_users', 'HTTP Simulated Users'), ('concurrent_connections', 'HTTP Concurrent Connections'), @@ -75,7 +76,8 @@ class IxLoadResourceHelper(ClientResourceHelper): def __init__(self, setup_helper): super(IxLoadResourceHelper, self).__init__(setup_helper) - self.result = OrderedDict((key, ResourceDataHelper()) for key in self.KPI_LIST) + self.result = collections.OrderedDict((key, ResourceDataHelper()) + for key in self.KPI_LIST) self.resource_file_name = '' self.data = None @@ -91,19 +93,20 @@ class IxLoadResourceHelper(ClientResourceHelper): self.result[key].append(value) def setup(self): - # TODO: fixupt scenario_helper to hanlde ixia + # NOTE: fixup scenario_helper to hanlde ixia self.resource_file_name = \ - find_relative_file(self.scenario_helper.scenario_cfg['ixia_profile'], - self.scenario_helper.scenario_cfg["task_path"]) - makedirs(self.RESULTS_MOUNT) + utils.find_relative_file( + self.scenario_helper.scenario_cfg['ixia_profile'], + self.scenario_helper.scenario_cfg["task_path"]) + utils.makedirs(self.RESULTS_MOUNT) cmd = MOUNT_CMD.format(self.vnfd_helper.mgmt_interface, self) LOG.debug(cmd) if not os.path.ismount(self.RESULTS_MOUNT): - call(cmd, shell=True) + subprocess.call(cmd, shell=True) shutil.rmtree(self.RESULTS_MOUNT, ignore_errors=True) - makedirs(self.RESULTS_MOUNT) + utils.makedirs(self.RESULTS_MOUNT) shutil.copy(self.resource_file_name, self.RESULTS_MOUNT) def make_aggregates(self): @@ -113,7 +116,7 @@ class IxLoadResourceHelper(ClientResourceHelper): def collect_kpi(self): if self.data: self._result.update(self.data) - LOG.info("Collect {0} KPIs {1}".format(self.RESOURCE_WORD, self._result)) + LOG.info("Collect %s KPIs %s", self.RESOURCE_WORD, self._result) return self._result def log(self): @@ -121,7 +124,7 @@ class IxLoadResourceHelper(ClientResourceHelper): LOG.debug(self.result[key]) -class IxLoadTrafficGen(SampleVNFTrafficGen): +class IxLoadTrafficGen(sample_vnf.SampleVNFTrafficGen): def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if resource_helper_type is None: @@ -131,6 +134,26 @@ class IxLoadTrafficGen(SampleVNFTrafficGen): resource_helper_type) self._result = {} + def update_gateways(self, links): + for name in links: + try: + gateway = next(intf["virtual-interface"]["dst_ip"] for intf in + self.setup_helper.vnfd_helper["vdu"][0][ + "external-interface"] if + intf["virtual-interface"]["vld_id"] == name) + + try: + links[name]["ip"]["gateway"] = gateway + except KeyError: + LOG.error("Invalid traffic profile: No IP section defined for %s", name) + raise + + except StopIteration: + LOG.debug("Cant find gateway for link %s", name) + links[name]["ip"]["gateway"] = "0.0.0.0" + + return links + def run_traffic(self, traffic_profile): ports = [] card = None @@ -142,11 +165,16 @@ class IxLoadTrafficGen(SampleVNFTrafficGen): for csv_file in glob.iglob(self.ssh_helper.join_bin_path('*.csv')): os.unlink(csv_file) + links_param = self.update_gateways( + traffic_profile.get_links_param()) + ixia_config = self.vnfd_helper.mgmt_interface["tg-config"] ixload_config = IXLOAD_CONFIG_TEMPLATE % ( ixia_config["ixchassis"], ports, card, self.vnfd_helper.mgmt_interface["ip"], self.ssh_helper.bin_path, - os.path.basename(self.resource_helper.resource_file_name)) + os.path.basename(self.resource_helper.resource_file_name), + jsonutils.dumps(links_param) + ) http_ixload_path = os.path.join(VNF_PATH, "../../traffic_profile") @@ -156,7 +184,7 @@ class IxLoadTrafficGen(SampleVNFTrafficGen): args="'%s'" % ixload_config) LOG.debug(cmd) - call(cmd, shell=True) + subprocess.call(cmd, shell=True) with open(self.ssh_helper.join_bin_path("ixLoad_HTTP_Client.csv")) as csv_file: lines = csv_file.readlines()[10:] @@ -170,9 +198,6 @@ class IxLoadTrafficGen(SampleVNFTrafficGen): self.resource_helper.log() self.resource_helper.data = self.resource_helper.make_aggregates() - def instantiate(self, scenario_cfg, context_cfg): - super(IxLoadTrafficGen, self).instantiate(scenario_cfg, context_cfg) - def terminate(self): - call(["pkill", "-9", "http_ixload.py"]) + subprocess.call(["pkill", "-9", "http_ixload.py"]) super(IxLoadTrafficGen, self).terminate() diff --git a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py new file mode 100644 index 000000000..285374a92 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py @@ -0,0 +1,1226 @@ +# Copyright (c) 2018-2019 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import logging +import requests +import six +import time + +from yardstick.common import exceptions +from yardstick.common import utils as common_utils +from yardstick.common import yaml_loader +from yardstick.network_services import utils as net_serv_utils +from yardstick.network_services.vnf_generic.vnf import sample_vnf + +try: + from lsapi import LsApi +except ImportError: + LsApi = common_utils.ErrorClass + +LOG = logging.getLogger(__name__) + + +class LandslideTrafficGen(sample_vnf.SampleVNFTrafficGen): + APP_NAME = 'LandslideTG' + + def __init__(self, name, vnfd, setup_env_helper_type=None, + resource_helper_type=None): + if resource_helper_type is None: + resource_helper_type = LandslideResourceHelper + super(LandslideTrafficGen, self).__init__(name, vnfd, + setup_env_helper_type, + resource_helper_type) + + self.bin_path = net_serv_utils.get_nsb_option('bin_path') + self.name = name + self.runs_traffic = True + self.traffic_finished = False + self.session_profile = None + + def listen_traffic(self, traffic_profile): + pass + + def terminate(self): + self.resource_helper.disconnect() + + def instantiate(self, scenario_cfg, context_cfg): + super(LandslideTrafficGen, self).instantiate(scenario_cfg, context_cfg) + self.resource_helper.connect() + + # Create test servers + test_servers = [x['test_server'] for x in self.vnfd_helper['config']] + self.resource_helper.create_test_servers(test_servers) + + # Create SUTs + [self.resource_helper.create_suts(x['suts']) for x in + self.vnfd_helper['config']] + + # Fill in test session based on session profile and test case options + self._load_session_profile() + + def run_traffic(self, traffic_profile): + self.resource_helper.abort_running_tests() + # Update DMF profile with related test case options + traffic_profile.update_dmf(self.scenario_helper.all_options) + # Create DMF in test user library + self.resource_helper.create_dmf(traffic_profile.dmf_config) + # Create/update test session in test user library + self.resource_helper.create_test_session(self.session_profile) + # Start test session + self.resource_helper.create_running_tests(self.session_profile['name']) + + def collect_kpi(self): + return self.resource_helper.collect_kpi() + + def wait_for_instantiate(self): + pass + + @staticmethod + def _update_session_suts(suts, testcase): + """ Create SUT entry. Update related EPC block in session profile. """ + for sut in suts: + # Update session profile EPC element with SUT info from pod file + tc_role = testcase['parameters'].get(sut['role']) + if tc_role: + _param = {} + if tc_role['class'] == 'Sut': + _param['name'] = sut['name'] + elif tc_role['class'] == 'TestNode': + _param.update({x: sut[x] for x in {'ip', 'phy', 'nextHop'} + if x in sut and sut[x]}) + testcase['parameters'][sut['role']].update(_param) + else: + LOG.info('Unexpected SUT role in pod file: "%s".', sut['role']) + return testcase + + def _update_session_test_servers(self, test_server, _tsgroup_index): + """ Update tsId, reservations, pre-resolved ARP in session profile """ + # Update test server name + test_groups = self.session_profile['tsGroups'] + test_groups[_tsgroup_index]['tsId'] = test_server['name'] + + # Update preResolvedArpAddress + arp_key = 'preResolvedArpAddress' + _preresolved_arp = test_server.get(arp_key) # list of dicts + if _preresolved_arp: + test_groups[_tsgroup_index][arp_key] = _preresolved_arp + + # Update reservations + if 'phySubnets' in test_server: + reservation = {'tsId': test_server['name'], + 'tsIndex': _tsgroup_index, + 'tsName': test_server['name'], + 'phySubnets': test_server['phySubnets']} + if 'reservations' in self.session_profile: + self.session_profile['reservations'].append(reservation) + else: + self.session_profile['reservePorts'] = 'true' + self.session_profile['reservations'] = [reservation] + + def _update_session_library_name(self, test_session): + """Update DMF library name in session profile""" + for _ts_group in test_session['tsGroups']: + for _tc in _ts_group['testCases']: + try: + for _mainflow in _tc['parameters']['Dmf']['mainflows']: + _mainflow['library'] = \ + self.vnfd_helper.mgmt_interface['user'] + except KeyError: + pass + + @staticmethod + def _update_session_tc_params(tc_options, testcase): + for _param_key in tc_options: + if _param_key == 'AssociatedPhys': + testcase[_param_key] = tc_options[_param_key] + continue + testcase['parameters'][_param_key] = tc_options[_param_key] + return testcase + + def _load_session_profile(self): + + with common_utils.open_relative_file( + self.scenario_helper.scenario_cfg['session_profile'], + self.scenario_helper.task_path) as stream: + self.session_profile = yaml_loader.yaml_load(stream) + + # Raise exception if number of entries differs in following files, + _config_files = ['pod file', 'session_profile file', 'test_case file'] + # Count testcases number in all tsGroups of session profile + session_tests_num = [xx for x in self.session_profile['tsGroups'] + for xx in x['testCases']] + # Create a set containing number of list elements in each structure + _config_files_blocks_num = [ + len(x) for x in + (self.vnfd_helper['config'], # test_servers and suts info + session_tests_num, + self.scenario_helper.all_options['test_cases'])] # test case file + + if len(set(_config_files_blocks_num)) != 1: + raise RuntimeError('Unequal number of elements. {}'.format( + dict(six.moves.zip_longest(_config_files, + _config_files_blocks_num)))) + + ts_names = set() + _tsgroup_idx = -1 + _testcase_idx = 0 + + # Iterate over data structures to overwrite session profile defaults + # _config: single list element holding test servers and SUTs info + # _tc_options: single test case parameters + for _config, tc_options in zip( + self.vnfd_helper['config'], # test servers and SUTS + self.scenario_helper.all_options['test_cases']): # testcase + + _ts_config = _config['test_server'] + + # Calculate test group/test case indexes based on test server name + if _ts_config['name'] in ts_names: + _testcase_idx += 1 + else: + _tsgroup_idx += 1 + _testcase_idx = 0 + + _testcase = \ + self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][ + _testcase_idx] + + if _testcase['type'] != _ts_config['role']: + raise RuntimeError( + 'Test type mismatch in TC#{} of test server {}'.format( + _testcase_idx, _ts_config['name'])) + + # Fill session profile with test servers parameters + if _ts_config['name'] not in ts_names: + self._update_session_test_servers(_ts_config, _tsgroup_idx) + ts_names.add(_ts_config['name']) + + # Fill session profile with suts parameters + self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][ + _testcase_idx].update( + self._update_session_suts(_config['suts'], _testcase)) + + # Update test case parameters + self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][ + _testcase_idx].update( + self._update_session_tc_params(tc_options, _testcase)) + + self._update_session_library_name(self.session_profile) + + +class LandslideResourceHelper(sample_vnf.ClientResourceHelper): + """Landslide TG helper class""" + + REST_STATUS_CODES = {'OK': 200, 'CREATED': 201, 'NO CHANGE': 409} + REST_API_CODES = {'NOT MODIFIED': 500810} + + def __init__(self, setup_helper): + super(LandslideResourceHelper, self).__init__(setup_helper) + self._result = {} + self.vnfd_helper = setup_helper.vnfd_helper + self.scenario_helper = setup_helper.scenario_helper + + # TAS Manager config initialization + self._url = None + self._user_id = None + self.session = None + self.license_data = {} + + # TCL session initialization + self._tcl = LandslideTclClient(LsTclHandler(), self) + + self.session = requests.Session() + self.running_tests_uri = 'runningTests' + self.test_session_uri = 'testSessions' + self.test_serv_uri = 'testServers' + self.suts_uri = 'suts' + self.users_uri = 'users' + self.user_lib_uri = None + self.run_id = None + + def abort_running_tests(self, timeout=60, delay=5): + """ Abort running test sessions, if any """ + _start_time = time.time() + while time.time() < _start_time + timeout: + run_tests_states = {x['id']: x['testStateOrStep'] + for x in self.get_running_tests()} + if not set(run_tests_states.values()).difference( + {'COMPLETE', 'COMPLETE_ERROR'}): + break + else: + [self.stop_running_tests(running_test_id=_id, force=True) + for _id, _state in run_tests_states.items() + if 'COMPLETE' not in _state] + time.sleep(delay) + else: + raise RuntimeError( + 'Some test runs not stopped during {} seconds'.format(timeout)) + + def _build_url(self, resource, action=None): + """ Build URL string + + :param resource: REST API resource name + :type resource: str + :param action: actions name and value + :type action: dict('name': <str>, 'value': <str>) + :returns str: REST API resource name with optional action info + """ + # Action is optional and accepted only in presence of resource param + if action and not resource: + raise ValueError("Resource name not provided") + # Concatenate actions + _action = ''.join(['?{}={}'.format(k, v) for k, v in + action.items()]) if action else '' + + return ''.join([self._url, resource, _action]) + + def get_response_params(self, method, resource, params=None): + """ Retrieve params from JSON response of specific resource URL + + :param method: one of supported REST API methods + :type method: str + :param resource: URI, requested resource name + :type resource: str + :param params: attributes to be found in JSON response + :type params: list(str) + """ + _res = [] + params = params if params else [] + response = self.exec_rest_request(method, resource) + # Get substring between last slash sign and question mark (if any) + url_last_part = resource.rsplit('/', 1)[-1].rsplit('?', 1)[0] + _response_json = response.json() + # Expect dict(), if URL last part and top dict key don't match + # Else, if they match, expect list() + k, v = list(_response_json.items())[0] + if k != url_last_part: + v = [v] # v: list(dict(str: str)) + # Extract params, or whole list of dicts (without top level key) + for x in v: + _res.append({param: x[param] for param in params} if params else x) + return _res + + def _create_user(self, auth, level=1): + """ Create new user + + :param auth: data to create user account on REST server + :type auth: dict + :param level: Landslide user permissions level + :type level: int + :returns int: user id + """ + # Set expiration date in two years since account creation date + _exp_date = time.strftime( + '{}/%m/%d %H:%M %Z'.format(time.gmtime().tm_year + 2)) + _username = auth['user'] + _fields = {"contactInformation": "", "expiresOn": _exp_date, + "fullName": "Test User", + "isActive": "true", "level": level, + "password": auth['password'], + "username": _username} + _response = self.exec_rest_request('post', self.users_uri, + json_data=_fields, raise_exc=False) + _resp_json = _response.json() + if _response.status_code == self.REST_STATUS_CODES['CREATED']: + # New user created + _id = _resp_json['id'] + LOG.info("New user created: username='%s', id='%s'", _username, + _id) + elif _resp_json.get('apiCode') == self.REST_API_CODES['NOT MODIFIED']: + # User already exists + LOG.info("Account '%s' already exists.", _username) + # Get user id + _id = self._modify_user(_username, {"isActive": "true"})['id'] + else: + raise exceptions.RestApiError( + 'Error during new user "{}" creation'.format(_username)) + return _id + + def _modify_user(self, username, fields): + """ Modify information about existing user + + :param username: user name of account to be modified + :type username: str + :param fields: data to modify user account on REST server + :type fields: dict + :returns dict: user info + """ + _response = self.exec_rest_request('post', self.users_uri, + action={'username': username}, + json_data=fields, raise_exc=False) + if _response.status_code == self.REST_STATUS_CODES['OK']: + _response = _response.json() + else: + raise exceptions.RestApiError( + 'Error during user "{}" data update: {}'.format( + username, + _response.status_code)) + LOG.info("User account '%s' modified: '%s'", username, _response) + return _response + + def _delete_user(self, username): + """ Delete user account + + :param username: username field + :type username: str + :returns bool: True if succeeded + """ + self.exec_rest_request('delete', self.users_uri, + action={'username': username}) + + def _get_users(self, username=None): + """ Get user records from REST server + + :param username: username field + :type username: None|str + :returns list(dict): empty list, or user record, or list of all users + """ + _response = self.get_response_params('get', self.users_uri) + _res = [u for u in _response if + u['username'] == username] if username else _response + return _res + + def exec_rest_request(self, method, resource, action=None, json_data=None, + logs=True, raise_exc=True): + """ Execute REST API request, return response object + + :param method: one of supported requests ('post', 'get', 'delete') + :type method: str + :param resource: URL of resource + :type resource: str + :param action: data used to provide URI located after question mark + :type action: dict + :param json_data: mandatory only for 'post' method + :type json_data: dict + :param logs: debug logs display flag + :type raise_exc: bool + :param raise_exc: if True, raise exception on REST API call error + :returns requests.Response(): REST API call response object + """ + json_data = json_data if json_data else {} + action = action if action else {} + _method = method.upper() + method = method.lower() + if method not in ('post', 'get', 'delete'): + raise ValueError("Method '{}' not supported".format(_method)) + + if method == 'post' and not action: + if not (json_data and isinstance(json_data, collections.Mapping)): + raise ValueError( + 'JSON data missing in {} request'.format(_method)) + + r = getattr(self.session, method)(self._build_url(resource, action), + json=json_data) + if raise_exc and not r.ok: + msg = 'Failed to "{}" resource "{}". Reason: "{}"'.format( + method, self._build_url(resource, action), r.reason) + raise exceptions.RestApiError(msg) + + if logs: + LOG.debug("RC: %s | Request: %s | URL: %s", r.status_code, method, + r.request.url) + LOG.debug("Response: %s", r.json()) + return r + + def connect(self): + """Connect to RESTful server using test user account""" + tas_info = self.vnfd_helper['mgmt-interface'] + # Supported REST Server ports: HTTP - 8080, HTTPS - 8181 + _port = '8080' if tas_info['proto'] == 'http' else '8181' + tas_info.update({'port': _port}) + self._url = '{proto}://{ip}:{port}/api/'.format(**tas_info) + self.session.headers.update({'Accept': 'application/json', + 'Content-type': 'application/json'}) + # Login with super user to create test user + self.session.auth = ( + tas_info['super-user'], tas_info['super-user-password']) + LOG.info("Connect using superuser: server='%s'", self._url) + auth = {x: tas_info[x] for x in ('user', 'password')} + self._user_id = self._create_user(auth) + # Login with test user + self.session.auth = auth['user'], auth['password'] + # Test user validity + self.exec_rest_request('get', '') + + self.user_lib_uri = 'libraries/{{}}/{}'.format(self.test_session_uri) + LOG.info("Login with test user: server='%s'", self._url) + # Read existing license + self.license_data['lic_id'] = tas_info['license'] + + # Tcl client init + self._tcl.connect(tas_info['ip'], *self.session.auth) + + return self.session + + def disconnect(self): + self.session = None + self._tcl.disconnect() + + def terminate(self): + self._terminated.value = 1 + + def create_dmf(self, dmf): + if isinstance(dmf, dict): + dmf = [dmf] + for _dmf in dmf: + # Update DMF library name in traffic profile + _dmf['dmf'].update( + {'library': self.vnfd_helper.mgmt_interface['user']}) + # Create DMF on Landslide server + self._tcl.create_dmf(_dmf) + + def delete_dmf(self, dmf): + if isinstance(dmf, list): + for _dmf in dmf: + self._tcl.delete_dmf(_dmf) + else: + self._tcl.delete_dmf(dmf) + + def create_suts(self, suts): + # Keep only supported keys in suts object + for _sut in suts: + sut_entry = {k: v for k, v in _sut.items() + if k not in {'phy', 'nextHop', 'role'}} + _response = self.exec_rest_request( + 'post', self.suts_uri, json_data=sut_entry, + logs=False, raise_exc=False) + if _response.status_code != self.REST_STATUS_CODES['CREATED']: + LOG.info(_response.reason) # Failed to create + _name = sut_entry.pop('name') + # Modify existing SUT + self.configure_sut(sut_name=_name, json_data=sut_entry) + else: + LOG.info("SUT created: %s", sut_entry) + + def get_suts(self, suts_id=None): + if suts_id: + _suts = self.exec_rest_request( + 'get', '{}/{}'.format(self.suts_uri, suts_id)).json() + else: + _suts = self.get_response_params('get', self.suts_uri) + + return _suts + + def configure_sut(self, sut_name, json_data): + """ Modify information of specific SUTs + + :param sut_name: name of existing SUT + :type sut_name: str + :param json_data: SUT settings + :type json_data: dict() + """ + LOG.info("Modifying SUT information...") + _response = self.exec_rest_request('post', + self.suts_uri, + action={'name': sut_name}, + json_data=json_data, + raise_exc=False) + if _response.status_code not in {self.REST_STATUS_CODES[x] for x in + {'OK', 'NO CHANGE'}}: + raise exceptions.RestApiError(_response.reason) + + LOG.info("Modified SUT: %s", sut_name) + + def delete_suts(self, suts_ids=None): + if not suts_ids: + _curr_suts = self.get_response_params('get', self.suts_uri) + suts_ids = [x['id'] for x in _curr_suts] + LOG.info("Deleting SUTs with following IDs: %s", suts_ids) + for _id in suts_ids: + self.exec_rest_request('delete', + '{}/{}'.format(self.suts_uri, _id)) + LOG.info("\tDone for SUT id: %s", _id) + + def _check_test_servers_state(self, test_servers_ids=None, delay=10, + timeout=300): + LOG.info("Waiting for related test servers state change to READY...") + # Wait on state change + _start_time = time.time() + while time.time() - _start_time < timeout: + ts_ids_not_ready = {x['id'] for x in + self.get_test_servers(test_servers_ids) + if x['state'] != 'READY'} + if ts_ids_not_ready == set(): + break + time.sleep(delay) + else: + raise RuntimeError( + 'Test servers not in READY state after {} seconds.'.format( + timeout)) + + def create_test_servers(self, test_servers): + """ Create test servers + + :param test_servers: input data for test servers creation + mandatory fields: managementIp + optional fields: name + :type test_servers: list(dict) + """ + _ts_ids = [] + for _ts in test_servers: + _msg = 'Created test server "%(name)s"' + _ts_ids.append(self._tcl.create_test_server(_ts)) + if _ts.get('thread_model'): + _msg += ' in mode: "%(thread_model)s"' + LOG.info(_msg, _ts) + + self._check_test_servers_state(_ts_ids) + + def get_test_servers(self, test_server_ids=None): + if not test_server_ids: # Get all test servers info + _test_servers = self.exec_rest_request( + 'get', self.test_serv_uri).json()[self.test_serv_uri] + LOG.info("Current test servers configuration: %s", _test_servers) + return _test_servers + + _test_servers = [] + for _id in test_server_ids: + _test_servers.append(self.exec_rest_request( + 'get', '{}/{}'.format(self.test_serv_uri, _id)).json()) + LOG.info("Current test servers configuration: %s", _test_servers) + return _test_servers + + def configure_test_servers(self, action, json_data=None, + test_server_ids=None): + if not test_server_ids: + test_server_ids = [x['id'] for x in self.get_test_servers()] + elif isinstance(test_server_ids, int): + test_server_ids = [test_server_ids] + for _id in test_server_ids: + self.exec_rest_request('post', + '{}/{}'.format(self.test_serv_uri, _id), + action=action, json_data=json_data) + LOG.info("Test server (id: %s) configuration done: %s", _id, + action) + return test_server_ids + + def delete_test_servers(self, test_servers_ids=None): + # Delete test servers + for _ts in self.get_test_servers(test_servers_ids): + self.exec_rest_request('delete', '{}/{}'.format(self.test_serv_uri, + _ts['id'])) + LOG.info("Deleted test server: %s", _ts['name']) + + def create_test_session(self, test_session): + # Use tcl client to create session + test_session['library'] = self._user_id + + # If no traffic duration set in test case, use predefined default value + # in session profile + test_session['duration'] = self.scenario_helper.all_options.get( + 'traffic_duration', + test_session['duration']) + + LOG.debug("Creating session='%s'", test_session['name']) + self._tcl.create_test_session(test_session) + + def get_test_session(self, test_session_name=None): + if test_session_name: + uri = 'libraries/{}/{}/{}'.format(self._user_id, + self.test_session_uri, + test_session_name) + else: + uri = self.user_lib_uri.format(self._user_id) + _test_sessions = self.exec_rest_request('get', uri).json() + return _test_sessions + + def configure_test_session(self, template_name, test_session): + # Override specified test session parameters + LOG.info('Update test session parameters: %s', test_session['name']) + test_session.update({'library': self._user_id}) + return self.exec_rest_request( + method='post', + action={'action': 'overrideAndSaveAs'}, + json_data=test_session, + resource='{}/{}'.format(self.user_lib_uri.format(self._user_id), + template_name)) + + def delete_test_session(self, test_session): + return self.exec_rest_request('delete', '{}/{}'.format( + self.user_lib_uri.format(self._user_id), test_session)) + + def create_running_tests(self, test_session_name): + r = self.exec_rest_request('post', + self.running_tests_uri, + json_data={'library': self._user_id, + 'name': test_session_name}) + if r.status_code != self.REST_STATUS_CODES['CREATED']: + raise exceptions.RestApiError('Failed to start test session.') + self.run_id = r.json()['id'] + + def get_running_tests(self, running_test_id=None): + """Get JSON structure of specified running test entity + + :param running_test_id: ID of created running test entity + :type running_test_id: int + :returns list: running tests entity + """ + if not running_test_id: + running_test_id = '' + _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id) + _res = self.exec_rest_request('get', _res_name, logs=False).json() + # If no run_id specified, skip top level key in response dict. + # Else return JSON as list + return _res.get('runningTests', [_res]) + + def delete_running_tests(self, running_test_id=None): + if not running_test_id: + running_test_id = '' + _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id) + self.get_response_params('delete', _res_name) + LOG.info("Deleted running test with id: %s", running_test_id) + + def _running_tests_action(self, running_test_id, action, json_data=None): + if not json_data: + json_data = {} + # Supported actions: + # 'stop', 'abort', 'continue', 'update', 'sendTcCommand', 'sendOdc' + _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id) + self.exec_rest_request('post', _res_name, {'action': action}, + json_data) + LOG.debug("Executed action: '%s' on running test id: %s", action, + running_test_id) + + def stop_running_tests(self, running_test_id, json_data=None, force=False): + _action = 'abort' if force else 'stop' + self._running_tests_action(running_test_id, _action, + json_data=json_data) + LOG.info('Performed action: "%s" to test run with id: %s', _action, + running_test_id) + + def check_running_test_state(self, run_id): + r = self.exec_rest_request('get', + '{}/{}'.format(self.running_tests_uri, + run_id)) + return r.json().get("testStateOrStep") + + def get_running_tests_results(self, run_id): + _res = self.exec_rest_request( + 'get', + '{}/{}/{}'.format(self.running_tests_uri, + run_id, + 'measurements')).json() + return _res + + def _write_results(self, results): + # Avoid None value at test session start + _elapsed_time = results['elapsedTime'] if results['elapsedTime'] else 0 + + _res_tabs = results.get('tabs') + # Avoid parsing 'tab' dict key initially (missing or empty) + if not _res_tabs: + return + + # Flatten nested dict holding Landslide KPIs of current test run + flat_kpis_dict = {} + for _tab, _kpis in six.iteritems(_res_tabs): + for _kpi, _value in six.iteritems(_kpis): + # Combine table name and KPI name using delimiter "::" + _key = '::'.join([_tab, _kpi]) + try: + # Cast value from str to float + # Remove comma and/or measure units, e.g. "us" + flat_kpis_dict[_key] = float( + _value.split(' ')[0].replace(',', '')) + except ValueError: # E.g. if KPI represents datetime + pass + LOG.info("Polling test results of test run id: %s. Elapsed time: %s " + "seconds", self.run_id, _elapsed_time) + return flat_kpis_dict + + def collect_kpi(self): + if 'COMPLETE' in self.check_running_test_state(self.run_id): + self._result.update({'done': True}) + return self._result + _res = self.get_running_tests_results(self.run_id) + _kpis = self._write_results(_res) + if _kpis: + _kpis.update({'run_id': int(self.run_id)}) + _kpis.update({'iteration': _res['iteration']}) + self._result.update(_kpis) + return self._result + + +class LandslideTclClient(object): + """Landslide TG TCL client class""" + + DEFAULT_TEST_NODE = { + 'ethStatsEnabled': True, + 'forcedEthInterface': '', + 'innerVlanId': 0, + 'ip': '', + 'mac': '', + 'mtu': 1500, + 'nextHop': '', + 'numLinksOrNodes': 1, + 'numVlan': 1, + 'phy': '', + 'uniqueVlanAddr': False, + 'vlanDynamic': 0, + 'vlanId': 0, + 'vlanUserPriority': 0, + 'vlanTagType': 0 + } + + TEST_NODE_CMD = \ + 'ls::create -TestNode-{} -under $p_ -Type "eth"' \ + ' -Phy "{phy}" -Ip "{ip}" -NumLinksOrNodes {numLinksOrNodes}' \ + ' -NextHop "{nextHop}" -Mac "{mac}" -MTU {mtu}' \ + ' -ForcedEthInterface "{forcedEthInterface}"' \ + ' -EthStatsEnabled {ethStatsEnabled}' \ + ' -VlanId {vlanId} -VlanUserPriority {vlanUserPriority}' \ + ' -NumVlan {numVlan} -UniqueVlanAddr {uniqueVlanAddr}' \ + ';' + + def __init__(self, tcl_handler, ts_context): + self.tcl_server_ip = None + self._user = None + self._library_id = None + self._basic_library_id = None + self._tcl = tcl_handler + self._ts_context = ts_context + self.ts_ids = set() + + # Test types names expected in session profile, test case and pod files + self._tc_types = {"SGW_Nodal", "SGW_Node", "MME_Nodal", "PGW_Node", + "PCRF_Node"} + + self._class_param_config_handler = { + "Array": self._configure_array_param, + "TestNode": self._configure_test_node_param, + "Sut": self._configure_sut_param, + "Dmf": self._configure_dmf_param + } + + def connect(self, tcl_server_ip, username, password): + """ Connect to TCL server with username and password + + :param tcl_server_ip: TCL server IP address + :type tcl_server_ip: str + :param username: existing username on TCL server + :type username: str + :param password: password related to username on TCL server + :type password: str + """ + LOG.info("connect: server='%s' user='%s'", tcl_server_ip, username) + res = self._tcl.execute( + "ls::login {} {} {}".format(tcl_server_ip, username, password)) + if 'java0x' not in res: # handle assignment reflects login success + raise exceptions.LandslideTclException( + "connect: login failed ='{}'.".format(res)) + self._library_id = self._tcl.execute( + "ls::get [ls::query LibraryInfo -userLibraryName {}] -Id".format( + username)) + self._basic_library_id = self._get_library_id('Basic') + self.tcl_server_ip = tcl_server_ip + self._user = username + LOG.debug("connect: user='%s' me='%s' basic='%s'", self._user, + self._library_id, + self._basic_library_id) + + def disconnect(self): + """ Disconnect from TCL server. Drop TCL connection configuration """ + LOG.info("disconnect: server='%s' user='%s'", + self.tcl_server_ip, self._user) + self._tcl.execute("ls::logout") + self.tcl_server_ip = None + self._user = None + self._library_id = None + self._basic_library_id = None + + def _add_test_server(self, name, ip): + try: + # Check if test server exists with name equal to _ts_name + ts_id = int(self.resolve_test_server_name(name)) + except ValueError: + # Such test server does not exist. Attempt to create it + ts_id = self._tcl.execute( + 'ls::perform AddTs -Name "{}" -Ip "{}"'.format(name, ip)) + try: + int(ts_id) + except ValueError: + # Failed to create test server, e.g. limit reached + raise RuntimeError( + 'Failed to create test server: "{}". {}'.format(name, + ts_id)) + return ts_id + + def _update_license(self, name): + """ Setup/update test server license + + :param name: test server name + :type name: str + """ + # Retrieve current TsInfo configuration, result stored in handle "ts" + self._tcl.execute( + 'set ts [ls::retrieve TsInfo -Name "{}"]'.format(name)) + + # Set license ID, if it differs from current one, update test server + _curr_lic_id = self._tcl.execute('ls::get $ts -RequestedLicense') + if _curr_lic_id != self._ts_context.license_data['lic_id']: + self._tcl.execute('ls::config $ts -RequestedLicense {}'.format( + self._ts_context.license_data['lic_id'])) + self._tcl.execute('ls::perform ModifyTs $ts') + + def _set_thread_model(self, name, thread_model): + # Retrieve test server configuration, store it in handle "tsc" + _cfguser_password = self._ts_context.vnfd_helper['mgmt-interface'][ + 'cfguser_password'] + self._tcl.execute( + 'set tsc [ls::perform RetrieveTsConfiguration ' + '-name "{}" {}]'.format(name, _cfguser_password)) + # Configure ThreadModel, if it differs from current one + thread_model_map = {'Legacy': 'V0', + 'Max': 'V1', + 'Fireball': 'V1_FB3'} + _model = thread_model_map[thread_model] + _curr_model = self._tcl.execute('ls::get $tsc -ThreadModel') + if _curr_model != _model: + self._tcl.execute( + 'ls::config $tsc -ThreadModel "{}"'.format(_model)) + self._tcl.execute( + 'ls::perform ApplyTsConfiguration $tsc {}'.format( + _cfguser_password)) + + def create_test_server(self, test_server): + _ts_thread_model = test_server.get('thread_model') + _ts_name = test_server['name'] + + ts_id = self._add_test_server(_ts_name, test_server['ip']) + + self._update_license(_ts_name) + + # Skip below code modifying thread_model if it is not defined + if _ts_thread_model: + self._set_thread_model(_ts_name, _ts_thread_model) + + return ts_id + + def create_test_session(self, test_session): + """ Create, configure and save Landslide test session object. + + :param test_session: Landslide TestSession object + :type test_session: dict + """ + LOG.info("create_test_session: name='%s'", test_session['name']) + self._tcl.execute('set test_ [ls::create TestSession]') + self._tcl.execute('ls::config $test_ -Library {} -Name "{}"'.format( + self._library_id, test_session['name'])) + self._tcl.execute('ls::config $test_ -Description "{}"'.format( + test_session['description'])) + if 'keywords' in test_session: + self._tcl.execute('ls::config $test_ -Keywords "{}"'.format( + test_session['keywords'])) + if 'duration' in test_session: + self._tcl.execute('ls::config $test_ -Duration "{}"'.format( + test_session['duration'])) + if 'iterations' in test_session: + self._tcl.execute('ls::config $test_ -Iterations "{}"'.format( + test_session['iterations'])) + if 'reservePorts' in test_session: + if test_session['reservePorts'] == 'true': + self._tcl.execute('ls::config $test_ -Reserve Ports') + + if 'reservations' in test_session: + for _reservation in test_session['reservations']: + self._configure_reservation(_reservation) + + if 'reportOptions' in test_session: + self._configure_report_options(test_session['reportOptions']) + + for _index, _group in enumerate(test_session['tsGroups']): + self._configure_ts_group(_group, _index) + + self._save_test_session() + + def create_dmf(self, dmf): + """ Create, configure and save Landslide Data Message Flow object. + + :param dmf: Landslide Data Message Flow object + :type: dmf: dict + """ + self._tcl.execute('set dmf_ [ls::create Dmf]') + _lib_id = self._get_library_id(dmf['dmf']['library']) + self._tcl.execute('ls::config $dmf_ -Library {} -Name "{}"'.format( + _lib_id, + dmf['dmf']['name'])) + for _param_key in dmf: + if _param_key == 'dmf': + continue + _param_value = dmf[_param_key] + if isinstance(_param_value, dict): + # Configure complex parameter + _tcl_cmd = 'ls::config $dmf_' + for _sub_param_key in _param_value: + _sub_param_value = _param_value[_sub_param_key] + if isinstance(_sub_param_value, str): + _tcl_cmd += ' -{} "{}"'.format(_sub_param_key, + _sub_param_value) + else: + _tcl_cmd += ' -{} {}'.format(_sub_param_key, + _sub_param_value) + + self._tcl.execute(_tcl_cmd) + else: + # Configure simple parameter + if isinstance(_param_value, str): + self._tcl.execute( + 'ls::config $dmf_ -{} "{}"'.format(_param_key, + _param_value)) + else: + self._tcl.execute( + 'ls::config $dmf_ -{} {}'.format(_param_key, + _param_value)) + self._save_dmf() + + def configure_dmf(self, dmf): + # Use create to reconfigure and overwrite existing dmf + self.create_dmf(dmf) + + def delete_dmf(self, dmf): + raise NotImplementedError + + def _save_dmf(self): + # Call 'Validate' to set default values for missing parameters + res = self._tcl.execute('ls::perform Validate -Dmf $dmf_') + if res == 'Invalid': + res = self._tcl.execute('ls::get $dmf_ -ErrorsAndWarnings') + LOG.error("_save_dmf: %s", res) + raise exceptions.LandslideTclException("_save_dmf: {}".format(res)) + else: + res = self._tcl.execute('ls::save $dmf_ -overwrite') + LOG.debug("_save_dmf: result (%s)", res) + + def _configure_report_options(self, options): + for _option_key in options: + _option_value = options[_option_key] + if _option_key == 'format': + _format = 0 + if _option_value == 'CSV': + _format = 1 + self._tcl.execute( + 'ls::config $test_.ReportOptions -Format {} ' + '-Ts -3 -Tc -3'.format(_format)) + else: + self._tcl.execute( + 'ls::config $test_.ReportOptions -{} {}'.format( + _option_key, + _option_value)) + + def _configure_ts_group(self, ts_group, ts_group_index): + try: + _ts_id = int(self.resolve_test_server_name(ts_group['tsId'])) + except ValueError: + raise RuntimeError('Test server name "{}" does not exist.'.format( + ts_group['tsId'])) + if _ts_id not in self.ts_ids: + self._tcl.execute( + 'set tss_ [ls::create TsGroup -under $test_ -tsId {} ]'.format( + _ts_id)) + self.ts_ids.add(_ts_id) + for _case in ts_group.get('testCases', []): + self._configure_tc_type(_case, ts_group_index) + + self._configure_preresolved_arp(ts_group.get('preResolvedArpAddress')) + + def _configure_tc_type(self, tc, ts_group_index): + if tc['type'] not in self._tc_types: + raise RuntimeError('Test type {} not supported.'.format( + tc['type'])) + tc['type'] = tc['type'].replace('_', ' ') + res = self._tcl.execute( + 'set tc_ [ls::retrieve testcase -libraryId {0} "{1}"]'.format( + self._basic_library_id, tc['type'])) + if 'Invalid' in res: + raise RuntimeError('Test type {} not found in "Basic" ' + 'library.'.format(tc['type'])) + self._tcl.execute( + 'ls::config $test_.TsGroup({}) -children-Tc $tc_'.format( + ts_group_index)) + self._tcl.execute('ls::config $tc_ -Library {0} -Name "{1}"'.format( + self._basic_library_id, tc['name'])) + self._tcl.execute( + 'ls::config $tc_ -Description "{}"'.format(tc['type'])) + self._tcl.execute( + 'ls::config $tc_ -Keywords "GTP LTE {}"'.format(tc['type'])) + if 'linked' in tc: + self._tcl.execute( + 'ls::config $tc_ -Linked {}'.format(tc['linked'])) + if 'AssociatedPhys' in tc: + self._tcl.execute('ls::config $tc_ -AssociatedPhys "{}"'.format( + tc['AssociatedPhys'])) + if 'parameters' in tc: + self._configure_parameters(tc['parameters']) + + def _configure_parameters(self, params): + self._tcl.execute('set p_ [ls::get $tc_ -children-Parameters(0)]') + for _param_key in sorted(params): + _param_value = params[_param_key] + if isinstance(_param_value, dict): + # Configure complex parameter + if _param_value['class'] in self._class_param_config_handler: + self._class_param_config_handler[_param_value['class']]( + _param_key, + _param_value) + else: + # Configure simple parameter + self._tcl.execute( + 'ls::create {} -under $p_ -Value "{}"'.format( + _param_key, + _param_value)) + + def _configure_array_param(self, name, params): + self._tcl.execute('ls::create -Array-{} -under $p_ ;'.format(name)) + for param in params['array']: + self._tcl.execute( + 'ls::create ArrayItem -under $p_.{} -Value "{}"'.format(name, + param)) + + def _configure_test_node_param(self, name, params): + _params = self.DEFAULT_TEST_NODE + _params.update(params) + + # TCL command expects lower case 'true' or 'false' + _params['ethStatsEnabled'] = str(_params['ethStatsEnabled']).lower() + _params['uniqueVlanAddr'] = str(_params['uniqueVlanAddr']).lower() + + cmd = self.TEST_NODE_CMD.format(name, **_params) + self._tcl.execute(cmd) + + def _configure_sut_param(self, name, params): + self._tcl.execute( + 'ls::create -Sut-{} -under $p_ -Name "{}";'.format(name, + params['name'])) + + def _configure_dmf_param(self, name, params): + self._tcl.execute('ls::create -Dmf-{} -under $p_ ;'.format(name)) + + for _flow_index, _flow in enumerate(params['mainflows']): + _lib_id = self._get_library_id(_flow['library']) + self._tcl.execute( + 'ls::perform AddDmfMainflow $p_.Dmf {} "{}"'.format( + _lib_id, + _flow['name'])) + + if not params.get('instanceGroups'): + return + + _instance_group = params['instanceGroups'][_flow_index] + + # Traffic Mixer parameters handling + for _key in ['mixType', 'rate']: + if _key in _instance_group: + self._tcl.execute( + 'ls::config $p_.Dmf.InstanceGroup({}) -{} {}'.format( + _flow_index, _key, _instance_group[_key])) + + # Assignments parameters handling + for _row_id, _row in enumerate(_instance_group.get('rows', [])): + self._tcl.execute( + 'ls::config $p_.Dmf.InstanceGroup({}).Row({}) -Node {} ' + '-OverridePort {} -ClientPort {} -Context {} -Role {} ' + '-PreferredTransport {} -RatingGroup {} ' + '-ServiceID {}'.format( + _flow_index, _row_id, _row['node'], + _row['overridePort'], _row['clientPort'], + _row['context'], _row['role'], _row['transport'], + _row['ratingGroup'], _row['serviceId'])) + + def _configure_reservation(self, reservation): + _ts_id = self.resolve_test_server_name(reservation['tsId']) + self._tcl.execute( + 'set reservation_ [ls::create Reservation -under $test_]') + self._tcl.execute( + 'ls::config $reservation_ -TsIndex {} -TsId {} ' + '-TsName "{}"'.format(reservation['tsIndex'], + _ts_id, + reservation['tsName'])) + for _subnet in reservation['phySubnets']: + self._tcl.execute( + 'set physubnet_ [ls::create PhySubnet -under $reservation_]') + self._tcl.execute( + 'ls::config $physubnet_ -Name "{}" -Base "{}" -Mask "{}" ' + '-NumIps {}'.format(_subnet['name'], _subnet['base'], + _subnet['mask'], _subnet['numIps'])) + + def _configure_preresolved_arp(self, pre_resolved_arp): + if not pre_resolved_arp: # Pre-resolved ARP configuration not found + return + for _entry in pre_resolved_arp: + # TsGroup handle name should correspond in _configure_ts_group() + self._tcl.execute( + 'ls::create PreResolvedArpAddress -under $tss_ ' + '-StartingAddress "{StartingAddress}" ' + '-NumNodes {NumNodes}'.format(**_entry)) + + def delete_test_session(self, test_session): + raise NotImplementedError + + def _save_test_session(self): + # Call 'Validate' to set default values for missing parameters + res = self._tcl.execute('ls::perform Validate -TestSession $test_') + if res == 'Invalid': + res = self._tcl.execute('ls::get $test_ -ErrorsAndWarnings') + raise exceptions.LandslideTclException( + "Test session validation failed. Server response: {}".format( + res)) + else: + self._tcl.execute('ls::save $test_ -overwrite') + LOG.debug("Test session saved successfully.") + + def _get_library_id(self, library): + _library_id = self._tcl.execute( + "ls::get [ls::query LibraryInfo -systemLibraryName {}] -Id".format( + library)) + try: + int(_library_id) + return _library_id + except ValueError: + pass + + _library_id = self._tcl.execute( + "ls::get [ls::query LibraryInfo -userLibraryName {}] -Id".format( + library)) + try: + int(_library_id) + except ValueError: + LOG.error("_get_library_id: library='%s' not found.", library) + raise exceptions.LandslideTclException( + "_get_library_id: library='{}' not found.".format( + library)) + + return _library_id + + def resolve_test_server_name(self, ts_name): + return self._tcl.execute("ls::query TsId {}".format(ts_name)) + + +class LsTclHandler(object): + """Landslide TCL Handler class""" + + LS_OK = "ls_ok" + JRE_PATH = net_serv_utils.get_nsb_option('jre_path_i386') + + def __init__(self): + self.tcl_cmds = {} + self._ls = LsApi(jre_path=self.JRE_PATH) + self._ls.tcl( + "ls::config ApiOptions -NoReturnSuccessResponseString '{}'".format( + self.LS_OK)) + + def execute(self, command): + res = self._ls.tcl(command) + self.tcl_cmds[command] = res + return res diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ping.py b/yardstick/network_services/vnf_generic/vnf/tg_ping.py index a989543f5..5c8819119 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_ping.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_ping.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/yardstick/network_services/vnf_generic/vnf/tg_pktgen.py b/yardstick/network_services/vnf_generic/vnf/tg_pktgen.py new file mode 100644 index 000000000..5da2178af --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/tg_pktgen.py @@ -0,0 +1,88 @@ +# Copyright (c) 2018-2019 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +from yardstick.common import constants +from yardstick.common import exceptions +from yardstick.common import utils +from yardstick.network_services.vnf_generic.vnf import base as vnf_base + + +LOG = logging.getLogger(__name__) + + +class PktgenTrafficGen(vnf_base.GenericTrafficGen): + """DPDK Pktgen traffic generator + + Website: http://pktgen-dpdk.readthedocs.io/en/latest/index.html + """ + + TIMEOUT = 30 + + def __init__(self, name, vnfd): + vnf_base.GenericTrafficGen.__init__(self, name, vnfd) + self._traffic_profile = None + self._node_ip = vnfd['mgmt-interface'].get('ip') + self._lua_node_port = self._get_lua_node_port( + vnfd['mgmt-interface'].get('service_ports', [])) + self._rate = 1 + + def instantiate(self, scenario_cfg, context_cfg): # pragma: no cover + pass + + def run_traffic(self, traffic_profile): + self._traffic_profile = traffic_profile + self._traffic_profile.init(self._node_ip, self._lua_node_port) + utils.wait_until_true(self._is_running, timeout=self.TIMEOUT, + sleep=2) + + def terminate(self): # pragma: no cover + pass + + def collect_kpi(self): # pragma: no cover + pass + + def scale(self, flavor=''): # pragma: no cover + pass + + def wait_for_instantiate(self): # pragma: no cover + pass + + def runner_method_start_iteration(self): + # pragma: no cover + LOG.debug('Start method') + # NOTE(ralonsoh): 'rate' should be modified between iterations. The + # current implementation is just for testing. + self._rate += 1 + self._traffic_profile.start() + self._traffic_profile.rate(self._rate) + time.sleep(4) + self._traffic_profile.stop() + + @staticmethod + def _get_lua_node_port(service_ports): + for port in (port for port in service_ports if + int(port['port']) == constants.LUA_PORT): + return int(port['node_port']) + # NOTE(ralonsoh): in case LUA port is not present, an exception should + # be raised. + + def _is_running(self): + try: + self._traffic_profile.help() + return True + except exceptions.PktgenActionError: + return False diff --git a/yardstick/network_services/vnf_generic/vnf/tg_prox.py b/yardstick/network_services/vnf_generic/vnf/tg_prox.py index 151252ce8..65b7bac10 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_prox.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_prox.py @@ -1,4 +1,4 @@ -# Copyright (c) 2017 Intel Corporation +# Copyright (c) 2017-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,9 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import absolute_import - import logging +import copy from yardstick.network_services.utils import get_nsb_option from yardstick.network_services.vnf_generic.vnf.prox_vnf import ProxApproxVnf @@ -30,23 +29,13 @@ class ProxTrafficGen(SampleVNFTrafficGen): LUA_PARAMETER_NAME = "gen" WAIT_TIME = 1 - @staticmethod - def _sort_vpci(vnfd): - """ - - :param vnfd: vnfd.yaml - :return: trex_cfg.yaml file - """ - - def key_func(interface): - return interface["virtual-interface"]["vpci"], interface["name"] + def __init__(self, name, vnfd, setup_env_helper_type=None, + resource_helper_type=None): + vnfd_cpy = copy.deepcopy(vnfd) + super(ProxTrafficGen, self).__init__(name, vnfd_cpy) - ext_intf = vnfd["vdu"][0]["external-interface"] - return sorted(ext_intf, key=key_func) - - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): - # don't call superclass, use custom wrapper of ProxApproxVnf - self._vnf_wrapper = ProxApproxVnf(name, vnfd, setup_env_helper_type, resource_helper_type) + self._vnf_wrapper = ProxApproxVnf( + name, vnfd, setup_env_helper_type, resource_helper_type) self.bin_path = get_nsb_option('bin_path', '') self.name = self._vnf_wrapper.name self.ssh_helper = self._vnf_wrapper.ssh_helper @@ -59,10 +48,6 @@ class ProxTrafficGen(SampleVNFTrafficGen): self._tg_process = None self._traffic_process = None - # used for generating stats - self.vpci_if_name_ascending = self._sort_vpci(vnfd) - self.resource_helper.vpci_if_name_ascending = self._sort_vpci(vnfd) - def terminate(self): self._vnf_wrapper.terminate() super(ProxTrafficGen, self).terminate() diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py index 630c8b9c0..80812876d 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,31 +12,624 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import absolute_import - -import time -import os +import ipaddress import logging -import sys +import six +import collections -from yardstick.common.utils import ErrorClass +from six import moves +from yardstick.common import utils +from yardstick.common import exceptions +from yardstick.network_services.libs.ixia_libs.ixnet import ixnet_api from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper -from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file + LOG = logging.getLogger(__name__) WAIT_AFTER_CFG_LOAD = 10 WAIT_FOR_TRAFFIC = 30 -IXIA_LIB = os.path.dirname(os.path.realpath(__file__)) -IXNET_LIB = os.path.join(IXIA_LIB, "../../libs/ixia_libs/IxNet") -sys.path.append(IXNET_LIB) +WAIT_PROTOCOLS_STARTED = 420 + + +class IxiaBasicScenario(object): + """Ixia Basic scenario for flow from port to port""" + + def __init__(self, client, context_cfg, ixia_cfg): + + self.client = client + self.context_cfg = context_cfg + self.ixia_cfg = ixia_cfg + + self._uplink_vports = None + self._downlink_vports = None + + def apply_config(self): + pass + + def run_protocols(self): + pass + + def stop_protocols(self): + pass + + def create_traffic_model(self, traffic_profile): + vports = self.client.get_vports() + self._uplink_vports = vports[::2] + self._downlink_vports = vports[1::2] + self.client.create_traffic_model(self._uplink_vports, + self._downlink_vports, + traffic_profile) + + def _get_stats(self): + return self.client.get_statistics() + + def generate_samples(self, resource_helper, ports, duration): + stats = self._get_stats() + + samples = {} + # this is not DPDK port num, but this is whatever number we gave + # when we selected ports and programmed the profile + for port_num in ports: + try: + # reverse lookup port name from port_num so the stats dict is descriptive + intf = resource_helper.vnfd_helper.find_interface_by_port(port_num) + port_name = intf['name'] + avg_latency = stats['Store-Forward_Avg_latency_ns'][port_num] + min_latency = stats['Store-Forward_Min_latency_ns'][port_num] + max_latency = stats['Store-Forward_Max_latency_ns'][port_num] + samples[port_name] = { + 'RxThroughputBps': float(stats['Bytes_Rx'][port_num]) / duration, + 'TxThroughputBps': float(stats['Bytes_Tx'][port_num]) / duration, + 'InPackets': int(stats['Valid_Frames_Rx'][port_num]), + 'OutPackets': int(stats['Frames_Tx'][port_num]), + 'InBytes': int(stats['Bytes_Rx'][port_num]), + 'OutBytes': int(stats['Bytes_Tx'][port_num]), + 'RxThroughput': float(stats['Valid_Frames_Rx'][port_num]) / duration, + 'TxThroughput': float(stats['Frames_Tx'][port_num]) / duration, + 'LatencyAvg': utils.safe_cast(avg_latency, int, 0), + 'LatencyMin': utils.safe_cast(min_latency, int, 0), + 'LatencyMax': utils.safe_cast(max_latency, int, 0) + } + except IndexError: + pass + + return samples + + def update_tracking_options(self): + pass + + def get_tc_rfc2544_options(self): + pass + + +class IxiaL3Scenario(IxiaBasicScenario): + """Ixia scenario for L3 flow between static ip's""" + + def _add_static_ips(self): + vports = self.client.get_vports() + uplink_intf_vport = [(self.client.get_static_interface(vport), vport) + for vport in vports[::2]] + downlink_intf_vport = [(self.client.get_static_interface(vport), vport) + for vport in vports[1::2]] + + for index in range(len(uplink_intf_vport)): + intf, vport = uplink_intf_vport[index] + try: + iprange = self.ixia_cfg['flow'].get('src_ip')[index] + start_ip = utils.get_ip_range_start(iprange) + count = utils.get_ip_range_count(iprange) + self.client.add_static_ipv4(intf, vport, start_ip, count, '32') + except IndexError: + raise exceptions.IncorrectFlowOption( + option="src_ip", link="uplink_{}".format(index)) + + intf, vport = downlink_intf_vport[index] + try: + iprange = self.ixia_cfg['flow'].get('dst_ip')[index] + start_ip = utils.get_ip_range_start(iprange) + count = utils.get_ip_range_count(iprange) + self.client.add_static_ipv4(intf, vport, start_ip, count, '32') + except IndexError: + raise exceptions.IncorrectFlowOption( + option="dst_ip", link="downlink_{}".format(index)) + + def _add_interfaces(self): + vports = self.client.get_vports() + uplink_vports = (vport for vport in vports[::2]) + downlink_vports = (vport for vport in vports[1::2]) + + ix_node = next(node for _, node in self.context_cfg['nodes'].items() + if node['role'] == 'IxNet') + + for intf in ix_node['interfaces'].values(): + ip = intf.get('local_ip') + mac = intf.get('local_mac') + gateway = None + try: + gateway = next(route.get('gateway') + for route in ix_node.get('routing_table') + if route.get('if') == intf.get('ifname')) + except StopIteration: + LOG.debug("Gateway not provided") + + if 'uplink' in intf.get('vld_id'): + self.client.add_interface(next(uplink_vports), + ip, mac, gateway) + else: + self.client.add_interface(next(downlink_vports), + ip, mac, gateway) + + def apply_config(self): + self._add_interfaces() + self._add_static_ips() + + def create_traffic_model(self, traffic_profile): + vports = self.client.get_vports() + self._uplink_vports = vports[::2] + self._downlink_vports = vports[1::2] + + uplink_endpoints = [port + '/protocols/static' + for port in self._uplink_vports] + downlink_endpoints = [port + '/protocols/static' + for port in self._downlink_vports] + + self.client.create_ipv4_traffic_model(uplink_endpoints, + downlink_endpoints, + traffic_profile) + + +class IxiaPppoeClientScenario(object): + def __init__(self, client, context_cfg, ixia_cfg): + + self.client = client + + self._uplink_vports = None + self._downlink_vports = None + + self._access_topologies = [] + self._core_topologies = [] + + self._context_cfg = context_cfg + self._ixia_cfg = ixia_cfg + self.protocols = [] + self.device_groups = [] + + def apply_config(self): + vports = self.client.get_vports() + self._uplink_vports = vports[::2] + self._downlink_vports = vports[1::2] + self._fill_ixia_config() + self._apply_access_network_config() + self._apply_core_network_config() + + def create_traffic_model(self, traffic_profile): + endpoints_id_pairs = self._get_endpoints_src_dst_id_pairs( + traffic_profile.full_profile) + endpoints_obj_pairs = \ + self._get_endpoints_src_dst_obj_pairs(endpoints_id_pairs) + if endpoints_obj_pairs: + uplink_endpoints = endpoints_obj_pairs[::2] + downlink_endpoints = endpoints_obj_pairs[1::2] + else: + uplink_endpoints = self._access_topologies + downlink_endpoints = self._core_topologies + self.client.create_ipv4_traffic_model(uplink_endpoints, + downlink_endpoints, + traffic_profile) + + def run_protocols(self): + LOG.info('PPPoE Scenario - Start Protocols') + self.client.start_protocols() + utils.wait_until_true( + lambda: self.client.is_protocols_running(self.protocols), + timeout=WAIT_PROTOCOLS_STARTED, sleep=2) + + def stop_protocols(self): + LOG.info('PPPoE Scenario - Stop Protocols') + self.client.stop_protocols() + + def _get_intf_addr(self, intf): + """Retrieve interface IP address and mask + + :param intf: could be the string which represents IP address + with mask (e.g 192.168.10.2/24) or a dictionary with the host + name and the port (e.g. {'tg__0': 'xe1'}) + :return: (tuple) pair of ip address and mask + """ + if isinstance(intf, six.string_types): + ip, mask = tuple(intf.split('/')) + return ip, int(mask) + + node_name, intf_name = next(iter(intf.items())) + node = self._context_cfg["nodes"].get(node_name, {}) + interface = node.get("interfaces", {})[intf_name] + ip = interface["local_ip"] + mask = interface["netmask"] + ipaddr = ipaddress.ip_network(six.text_type('{}/{}'.format(ip, mask)), + strict=False) + return ip, ipaddr.prefixlen + + @staticmethod + def _get_endpoints_src_dst_id_pairs(flows_params): + """Get list of flows src/dst port pairs + + Create list of flows src/dst port pairs based on traffic profile + flows data. Each uplink/downlink pair in traffic profile represents + specific flows between the pair of ports. + + Example ('port' key represents port on which flow will be created): + + Input flows data: + uplink_0: + ipv4: + id: 1 + port: xe0 + downlink_0: + ipv4: + id: 2 + port: xe1 + uplink_1: + ipv4: + id: 3 + port: xe2 + downlink_1: + ipv4: + id: 4 + port: xe3 + + Result list: ['xe0', 'xe1', 'xe2', 'xe3'] + + Result list means that the following flows pairs will be created: + - uplink 0: port xe0 <-> port xe1 + - downlink 0: port xe1 <-> port xe0 + - uplink 1: port xe2 <-> port xe3 + - downlink 1: port xe3 <-> port xe2 + + :param flows_params: ordered dict of traffic profile flows params + :return: (list) list of flows src/dst ports + """ + if len(flows_params) % 2: + raise RuntimeError('Number of uplink/downlink pairs' + ' in traffic profile is not equal') + endpoint_pairs = [] + for flow in flows_params: + port = flows_params[flow]['ipv4'].get('port') + if port is None: + continue + endpoint_pairs.append(port) + return endpoint_pairs + + def _get_endpoints_src_dst_obj_pairs(self, endpoints_id_pairs): + """Create list of uplink/downlink device groups pairs + + Based on traffic profile options, create list of uplink/downlink + device groups pairs between which flow groups will be created: + + 1. In case uplink/downlink flows in traffic profile doesn't have + specified 'port' key, flows will be created between topologies + on corresponding access and core port. + E.g.: + Access topology on xe0: topology1 + Core topology on xe1: topology2 + Flows will be created between: + topology1 -> topology2 + topology2 -> topology1 + + 2. In case uplink/downlink flows in traffic profile have specified + 'port' key, flows will be created between device groups on this + port. + E.g., for the following traffic profile + uplink_0: + port: xe0 + downlink_0: + port: xe1 + uplink_1: + port: xe0 + downlink_0: + port: xe3 + Flows will be created between: + Port xe0 (dg1) -> Port xe1 (dg1) + Port xe1 (dg1) -> Port xe0 (dg1) + Port xe0 (dg2) -> Port xe3 (dg1) + Port xe3 (dg3) -> Port xe0 (dg1) + + :param endpoints_id_pairs: (list) List of uplink/downlink flows ports + pairs + :return: (list) list of uplink/downlink device groups descriptors pairs + """ + pppoe = self._ixia_cfg['pppoe_client'] + sessions_per_port = pppoe['sessions_per_port'] + sessions_per_svlan = pppoe['sessions_per_svlan'] + svlan_count = int(sessions_per_port / sessions_per_svlan) + + uplink_ports = [p['tg__0'] for p in self._ixia_cfg['flow']['src_ip']] + downlink_ports = [p['tg__0'] for p in self._ixia_cfg['flow']['dst_ip']] + uplink_port_topology_map = zip(uplink_ports, self._access_topologies) + downlink_port_topology_map = zip(downlink_ports, self._core_topologies) + + port_to_dev_group_mapping = {} + for port, topology in uplink_port_topology_map: + topology_dgs = self.client.get_topology_device_groups(topology) + port_to_dev_group_mapping[port] = topology_dgs + for port, topology in downlink_port_topology_map: + topology_dgs = self.client.get_topology_device_groups(topology) + port_to_dev_group_mapping[port] = topology_dgs + + uplink_endpoints = endpoints_id_pairs[::2] + downlink_endpoints = endpoints_id_pairs[1::2] + + uplink_dev_groups = [] + group_up = [uplink_endpoints[i:i + svlan_count] + for i in range(0, len(uplink_endpoints), svlan_count)] + + for group in group_up: + for i, port in enumerate(group): + uplink_dev_groups.append(port_to_dev_group_mapping[port][i]) + + downlink_dev_groups = [] + for port in downlink_endpoints: + downlink_dev_groups.append(port_to_dev_group_mapping[port][0]) + + endpoint_obj_pairs = [] + [endpoint_obj_pairs.extend([up, down]) + for up, down in zip(uplink_dev_groups, downlink_dev_groups)] + + return endpoint_obj_pairs + + def _fill_ixia_config(self): + pppoe = self._ixia_cfg["pppoe_client"] + ipv4 = self._ixia_cfg["ipv4_client"] + + _ip = [self._get_intf_addr(intf)[0] for intf in pppoe["ip"]] + self._ixia_cfg["pppoe_client"]["ip"] = _ip + + _ip = [self._get_intf_addr(intf)[0] for intf in ipv4["gateway_ip"]] + self._ixia_cfg["ipv4_client"]["gateway_ip"] = _ip + + addrs = [self._get_intf_addr(intf) for intf in ipv4["ip"]] + _ip = [addr[0] for addr in addrs] + _prefix = [addr[1] for addr in addrs] + + self._ixia_cfg["ipv4_client"]["ip"] = _ip + self._ixia_cfg["ipv4_client"]["prefix"] = _prefix + + def _apply_access_network_config(self): + pppoe = self._ixia_cfg["pppoe_client"] + sessions_per_port = pppoe['sessions_per_port'] + sessions_per_svlan = pppoe['sessions_per_svlan'] + svlan_count = int(sessions_per_port / sessions_per_svlan) + + # add topology per uplink port (access network) + for access_tp_id, vport in enumerate(self._uplink_vports): + name = 'Topology access {}'.format(access_tp_id) + tp = self.client.add_topology(name, vport) + self._access_topologies.append(tp) + # add device group per svlan + for dg_id in range(svlan_count): + s_vlan_id = int(pppoe['s_vlan']) + dg_id + access_tp_id * svlan_count + s_vlan = ixnet_api.Vlan(vlan_id=s_vlan_id) + c_vlan = ixnet_api.Vlan(vlan_id=pppoe['c_vlan'], vlan_id_step=1) + name = 'SVLAN {}'.format(s_vlan_id) + dg = self.client.add_device_group(tp, name, sessions_per_svlan) + self.device_groups.append(dg) + # add ethernet layer to device group + ethernet = self.client.add_ethernet(dg, 'Ethernet') + self.protocols.append(ethernet) + self.client.add_vlans(ethernet, [s_vlan, c_vlan]) + # add ppp over ethernet + if 'pap_user' in pppoe: + ppp = self.client.add_pppox_client(ethernet, 'pap', + pppoe['pap_user'], + pppoe['pap_password']) + else: + ppp = self.client.add_pppox_client(ethernet, 'chap', + pppoe['chap_user'], + pppoe['chap_password']) + self.protocols.append(ppp) + + def _apply_core_network_config(self): + ipv4 = self._ixia_cfg["ipv4_client"] + sessions_per_port = ipv4['sessions_per_port'] + sessions_per_vlan = ipv4['sessions_per_vlan'] + vlan_count = int(sessions_per_port / sessions_per_vlan) + + # add topology per downlink port (core network) + for core_tp_id, vport in enumerate(self._downlink_vports): + name = 'Topology core {}'.format(core_tp_id) + tp = self.client.add_topology(name, vport) + self._core_topologies.append(tp) + # add device group per vlan + for dg_id in range(vlan_count): + name = 'Core port {}'.format(core_tp_id) + dg = self.client.add_device_group(tp, name, sessions_per_vlan) + self.device_groups.append(dg) + # add ethernet layer to device group + ethernet = self.client.add_ethernet(dg, 'Ethernet') + self.protocols.append(ethernet) + if 'vlan' in ipv4: + vlan_id = int(ipv4['vlan']) + dg_id + core_tp_id * vlan_count + vlan = ixnet_api.Vlan(vlan_id=vlan_id) + self.client.add_vlans(ethernet, [vlan]) + # add ipv4 layer + gw_ip = ipv4['gateway_ip'][core_tp_id] + # use gw addr to generate ip addr from the same network + ip_addr = ipaddress.IPv4Address(gw_ip) + 1 + ipv4_obj = self.client.add_ipv4(ethernet, name='ipv4', + addr=ip_addr, + addr_step='0.0.0.1', + prefix=ipv4['prefix'][core_tp_id], + gateway=gw_ip) + self.protocols.append(ipv4_obj) + if ipv4.get("bgp"): + bgp_peer_obj = self.client.add_bgp(ipv4_obj, + dut_ip=ipv4["bgp"]["dut_ip"], + local_as=ipv4["bgp"]["as_number"], + bgp_type=ipv4["bgp"].get("bgp_type")) + self.protocols.append(bgp_peer_obj) + + def update_tracking_options(self): + priority_map = { + 'raw': 'ipv4Raw0', + 'tos': {'precedence': 'ipv4Precedence0'}, + 'dscp': {'defaultPHB': 'ipv4DefaultPhb0', + 'selectorPHB': 'ipv4ClassSelectorPhb0', + 'assuredPHB': 'ipv4AssuredForwardingPhb0', + 'expeditedPHB': 'ipv4ExpeditedForwardingPhb0'} + } + + prio_trackby_key = 'ipv4Precedence0' + + try: + priority = list(self._ixia_cfg['priority'])[0] + if priority == 'raw': + prio_trackby_key = priority_map[priority] + elif priority in ['tos', 'dscp']: + priority_type = list(self._ixia_cfg['priority'][priority])[0] + prio_trackby_key = priority_map[priority][priority_type] + except KeyError: + pass + + tracking_options = ['flowGroup0', 'vlanVlanId0', prio_trackby_key] + self.client.set_flow_tracking(tracking_options) + + def get_tc_rfc2544_options(self): + return self._ixia_cfg.get('rfc2544') + + def _get_stats(self): + return self.client.get_pppoe_scenario_statistics() + + @staticmethod + def get_flow_id_data(stats, flow_id, key): + result = [float(flow.get(key)) for flow in stats if flow['id'] == flow_id] + return sum(result) / len(result) + + def get_priority_flows_stats(self, samples, duration): + results = {} + priorities = set([flow['IP_Priority'] for flow in samples]) + for priority in priorities: + tx_frames = sum( + [int(flow['Tx_Frames']) for flow in samples + if flow['IP_Priority'] == priority]) + rx_frames = sum( + [int(flow['Rx_Frames']) for flow in samples + if flow['IP_Priority'] == priority]) + prio_flows_num = len([flow for flow in samples + if flow['IP_Priority'] == priority]) + avg_latency_ns = sum( + [int(flow['Store-Forward_Avg_latency_ns']) for flow in samples + if flow['IP_Priority'] == priority]) / prio_flows_num + min_latency_ns = min( + [int(flow['Store-Forward_Min_latency_ns']) for flow in samples + if flow['IP_Priority'] == priority]) + max_latency_ns = max( + [int(flow['Store-Forward_Max_latency_ns']) for flow in samples + if flow['IP_Priority'] == priority]) + tx_throughput = float(tx_frames) / duration + rx_throughput = float(rx_frames) / duration + results[priority] = { + 'InPackets': rx_frames, + 'OutPackets': tx_frames, + 'RxThroughput': round(rx_throughput, 3), + 'TxThroughput': round(tx_throughput, 3), + 'LatencyAvg': utils.safe_cast(avg_latency_ns, int, 0), + 'LatencyMin': utils.safe_cast(min_latency_ns, int, 0), + 'LatencyMax': utils.safe_cast(max_latency_ns, int, 0) + } + return results + + def generate_samples(self, resource_helper, ports, duration): + + stats = self._get_stats() + samples = {} + ports_stats = stats['port_statistics'] + flows_stats = stats['flow_statistic'] + pppoe_subs_per_port = stats['pppox_client_per_port'] + + # Get sorted list of ixia ports names + ixia_port_names = sorted([data['port_name'] for data in ports_stats]) + + # Set 'port_id' key for ports stats items + for item in ports_stats: + port_id = item.pop('port_name').split('-')[-1].strip() + item['port_id'] = int(port_id) + + # Set 'id' key for flows stats items + for item in flows_stats: + flow_id = item.pop('Flow_Group').split('-')[1].strip() + item['id'] = int(flow_id) + + # Set 'port_id' key for pppoe subs per port stats + for item in pppoe_subs_per_port: + port_id = item.pop('subs_port').split('-')[-1].strip() + item['port_id'] = int(port_id) + + # Map traffic flows to ports + port_flow_map = collections.defaultdict(set) + for item in flows_stats: + tx_port = item.pop('Tx_Port') + tx_port_index = ixia_port_names.index(tx_port) + port_flow_map[tx_port_index].update([item['id']]) + + # Sort ports stats + ports_stats = sorted(ports_stats, key=lambda k: k['port_id']) + + # Get priority flows stats + prio_flows_stats = self.get_priority_flows_stats(flows_stats, duration) + samples['priority_stats'] = prio_flows_stats + + # this is not DPDK port num, but this is whatever number we gave + # when we selected ports and programmed the profile + for port_num in ports: + try: + # reverse lookup port name from port_num so the stats dict is descriptive + intf = resource_helper.vnfd_helper.find_interface_by_port(port_num) + port_name = intf['name'] + port_id = ports_stats[port_num]['port_id'] + port_subs_stats = \ + [port_data for port_data in pppoe_subs_per_port + if port_data.get('port_id') == port_id] + + avg_latency = \ + sum([float(self.get_flow_id_data( + flows_stats, flow, 'Store-Forward_Avg_latency_ns')) + for flow in port_flow_map[port_num]]) / len(port_flow_map[port_num]) + min_latency = \ + min([float(self.get_flow_id_data( + flows_stats, flow, 'Store-Forward_Min_latency_ns')) + for flow in port_flow_map[port_num]]) + max_latency = \ + max([float(self.get_flow_id_data( + flows_stats, flow, 'Store-Forward_Max_latency_ns')) + for flow in port_flow_map[port_num]]) + + samples[port_name] = { + 'RxThroughputBps': float(ports_stats[port_num]['Bytes_Rx']) / duration, + 'TxThroughputBps': float(ports_stats[port_num]['Bytes_Tx']) / duration, + 'InPackets': int(ports_stats[port_num]['Valid_Frames_Rx']), + 'OutPackets': int(ports_stats[port_num]['Frames_Tx']), + 'InBytes': int(ports_stats[port_num]['Bytes_Rx']), + 'OutBytes': int(ports_stats[port_num]['Bytes_Tx']), + 'RxThroughput': float(ports_stats[port_num]['Valid_Frames_Rx']) / duration, + 'TxThroughput': float(ports_stats[port_num]['Frames_Tx']) / duration, + 'LatencyAvg': utils.safe_cast(avg_latency, int, 0), + 'LatencyMin': utils.safe_cast(min_latency, int, 0), + 'LatencyMax': utils.safe_cast(max_latency, int, 0) + } + + if port_subs_stats: + samples[port_name].update( + {'SessionsUp': int(port_subs_stats[0]['Sessions_Up']), + 'SessionsDown': int(port_subs_stats[0]['Sessions_Down']), + 'SessionsNotStarted': int(port_subs_stats[0]['Sessions_Not_Started']), + 'SessionsTotal': int(port_subs_stats[0]['Sessions_Total'])} + ) + + except IndexError: + pass -try: - from IxNet import IxNextgen -except ImportError: - IxNextgen = ErrorClass + return samples class IxiaRfc2544Helper(Rfc2544ResourceHelper): @@ -53,7 +646,13 @@ class IxiaResourceHelper(ClientResourceHelper): super(IxiaResourceHelper, self).__init__(setup_helper) self.scenario_helper = setup_helper.scenario_helper - self.client = IxNextgen() + self._ixia_scenarios = { + "IxiaBasic": IxiaBasicScenario, + "IxiaL3": IxiaL3Scenario, + "IxiaPppoeClient": IxiaPppoeClientScenario, + } + + self.client = ixnet_api.IxNextgen() if rfc_helper_type is None: rfc_helper_type = IxiaRfc2544Helper @@ -61,54 +660,45 @@ class IxiaResourceHelper(ClientResourceHelper): self.rfc_helper = rfc_helper_type(self.scenario_helper) self.uplink_ports = None self.downlink_ports = None + self.context_cfg = None + self._ix_scenario = None self._connect() def _connect(self, client=None): - self.client._connect(self.vnfd_helper) + self.client.connect(self.vnfd_helper) - def get_stats(self, *args, **kwargs): - return self.client.ix_get_statistics() + def setup(self): + super(IxiaResourceHelper, self).setup() + self._init_ix_scenario() def stop_collect(self): + self._ix_scenario.stop_protocols() self._terminated.value = 1 - if self.client: - self.client.ix_stop_traffic() - def generate_samples(self, ports, key=None, default=None): - stats = self.get_stats() - last_result = stats[1] - latency = stats[0] + def generate_samples(self, ports, duration): + return self._ix_scenario.generate_samples(self, ports, duration) - samples = {} - # this is not DPDK port num, but this is whatever number we gave - # when we selected ports and programmed the profile - for port_num in ports: - try: - # reverse lookup port name from port_num so the stats dict is descriptive - intf = self.vnfd_helper.find_interface_by_port(port_num) - port_name = intf["name"] - samples[port_name] = { - "rx_throughput_kps": float(last_result["Rx_Rate_Kbps"][port_num]), - "tx_throughput_kps": float(last_result["Tx_Rate_Kbps"][port_num]), - "rx_throughput_mbps": float(last_result["Rx_Rate_Mbps"][port_num]), - "tx_throughput_mbps": float(last_result["Tx_Rate_Mbps"][port_num]), - "in_packets": int(last_result["Valid_Frames_Rx"][port_num]), - "out_packets": int(last_result["Frames_Tx"][port_num]), - "RxThroughput": int(last_result["Valid_Frames_Rx"][port_num]) / 30, - "TxThroughput": int(last_result["Frames_Tx"][port_num]) / 30, - } - if key: - avg_latency = latency["Store-Forward_Avg_latency_ns"][port_num] - min_latency = latency["Store-Forward_Min_latency_ns"][port_num] - max_latency = latency["Store-Forward_Max_latency_ns"][port_num] - samples[port_name][key] = \ - {"Store-Forward_Avg_latency_ns": avg_latency, - "Store-Forward_Min_latency_ns": min_latency, - "Store-Forward_Max_latency_ns": max_latency} - except IndexError: - pass + def _init_ix_scenario(self): + ixia_config = self.scenario_helper.scenario_cfg.get('ixia_config', 'IxiaBasic') - return samples + if ixia_config in self._ixia_scenarios: + scenario_type = self._ixia_scenarios[ixia_config] + + self._ix_scenario = scenario_type(self.client, self.context_cfg, + self.scenario_helper.scenario_cfg['options']) + else: + raise RuntimeError( + "IXIA config type '{}' not supported".format(ixia_config)) + + def _initialize_client(self, traffic_profile): + """Initialize the IXIA IxNetwork client and configure the server""" + self.client.clear_config() + self.client.assign_ports() + self._ix_scenario.apply_config() + self._ix_scenario.create_traffic_model(traffic_profile) + + def update_tracking_options(self): + self._ix_scenario.update_tracking_options() def run_traffic(self, traffic_profile): if self._terminated.value: @@ -116,18 +706,13 @@ class IxiaResourceHelper(ClientResourceHelper): min_tol = self.rfc_helper.tolerance_low max_tol = self.rfc_helper.tolerance_high + precision = self.rfc_helper.tolerance_precision + resolution = self.rfc_helper.resolution default = "00:00:00:00:00:00" self._build_ports() - - # we don't know client_file_name until runtime as instantiate - client_file_name = \ - find_relative_file(self.scenario_helper.scenario_cfg['ixia_profile'], - self.scenario_helper.scenario_cfg["task_path"]) - self.client.ix_load_config(client_file_name) - time.sleep(WAIT_AFTER_CFG_LOAD) - - self.client.ix_assign_ports() + traffic_profile.update_traffic_profile(self) + self._initialize_client(traffic_profile) mac = {} for port_name in self.vnfd_helper.port_pairs.all_ports: @@ -139,49 +724,106 @@ class IxiaResourceHelper(ClientResourceHelper): mac["src_mac_{}".format(port_num)] = virt_intf.get("local_mac", default) mac["dst_mac_{}".format(port_num)] = virt_intf.get("dst_mac", default) - samples = {} - # Generate ixia traffic config... + self._ix_scenario.run_protocols() + try: while not self._terminated.value: - traffic_profile.execute_traffic(self, self.client, mac) + first_run = traffic_profile.execute_traffic(self, self.client, + mac) self.client_started.value = 1 - time.sleep(WAIT_FOR_TRAFFIC) - self.client.ix_stop_traffic() - samples = self.generate_samples(traffic_profile.ports) + # pylint: disable=unnecessary-lambda + utils.wait_until_true(lambda: self.client.is_traffic_stopped(), + timeout=traffic_profile.config.duration * 2) + rfc2544_opts = self._ix_scenario.get_tc_rfc2544_options() + samples = self.generate_samples(traffic_profile.ports, + traffic_profile.config.duration) + + completed, samples = traffic_profile.get_drop_percentage( + samples, min_tol, max_tol, precision, resolution, + first_run=first_run, tc_rfc2544_opts=rfc2544_opts) self._queue.put(samples) - status, samples = traffic_profile.get_drop_percentage(self, samples, min_tol, - max_tol, self.client, mac) - current = samples['CurrentDropPercentage'] - if min_tol <= current <= max_tol or status == 'Completed': + if completed: self._terminated.value = 1 - self.client.ix_stop_traffic() - self._queue.put(samples) + except Exception: # pylint: disable=broad-except + LOG.exception('Run Traffic terminated') + + self._ix_scenario.stop_protocols() + self.client_started.value = 0 + self._terminated.value = 1 + + def run_test(self, traffic_profile, tasks_queue, results_queue, *args): # pragma: no cover + LOG.info("Ixia resource_helper run_test") + if self._terminated.value: + return + + min_tol = self.rfc_helper.tolerance_low + max_tol = self.rfc_helper.tolerance_high + precision = self.rfc_helper.tolerance_precision + resolution = self.rfc_helper.resolution + default = "00:00:00:00:00:00" - if not self.rfc_helper.is_done(): - self._terminated.value = 1 - return + self._build_ports() + traffic_profile.update_traffic_profile(self) + self._initialize_client(traffic_profile) - traffic_profile.execute_traffic(self, self.client, mac) - for _ in range(5): - time.sleep(self.LATENCY_TIME_SLEEP) - self.client.ix_stop_traffic() - samples = self.generate_samples(traffic_profile.ports, 'latency', {}) + mac = {} + for port_name in self.vnfd_helper.port_pairs.all_ports: + intf = self.vnfd_helper.find_interface(name=port_name) + virt_intf = intf["virtual-interface"] + # we only know static traffic id by reading the json + # this is used by _get_ixia_trafficrofile + port_num = self.vnfd_helper.port_num(intf) + mac["src_mac_{}".format(port_num)] = virt_intf.get("local_mac", default) + mac["dst_mac_{}".format(port_num)] = virt_intf.get("dst_mac", default) + + self._ix_scenario.run_protocols() + + try: + completed = False + self.rfc_helper.iteration.value = 0 + self.client_started.value = 1 + while completed is False and not self._terminated.value: + LOG.info("Wait for task ...") + + try: + task = tasks_queue.get(True, 5) + except moves.queue.Empty: + continue + else: + if task != 'RUN_TRAFFIC': + continue + + self.rfc_helper.iteration.value += 1 + LOG.info("Got %s task, start iteration %d", task, + self.rfc_helper.iteration.value) + first_run = traffic_profile.execute_traffic(self, self.client, + mac) + # pylint: disable=unnecessary-lambda + utils.wait_until_true(lambda: self.client.is_traffic_stopped(), + timeout=traffic_profile.config.duration * 2) + samples = self.generate_samples(traffic_profile.ports, + traffic_profile.config.duration) + + completed, samples = traffic_profile.get_drop_percentage( + samples, min_tol, max_tol, precision, resolution, + first_run=first_run) self._queue.put(samples) - traffic_profile.start_ixia_latency(self, self.client, mac) - if self._terminated.value: - break - self.client.ix_stop_traffic() - except Exception: # pylint: disable=broad-except - LOG.exception("Run Traffic terminated") + if completed: + LOG.debug("IxiaResourceHelper::run_test - test completed") + results_queue.put('COMPLETE') + else: + results_queue.put('CONTINUE') + tasks_queue.task_done() - self._terminated.value = 1 + except Exception: # pylint: disable=broad-except + LOG.exception('Run Traffic terminated') - def collect_kpi(self): - self.rfc_helper.iteration.value += 1 - return super(IxiaResourceHelper, self).collect_kpi() + self._ix_scenario.stop_protocols() + self.client_started.value = 0 + LOG.debug("IxiaResourceHelper::run_test done") class IxiaTrafficGen(SampleVNFTrafficGen): diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py index 4e9f4bdc1..a9c0222ac 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,74 +11,49 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -""" Trex traffic generation definitions which implements rfc2544 """ -from __future__ import absolute_import -from __future__ import print_function -import time import logging -from collections import Mapping - -from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexTrafficGen -from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper -from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexResourceHelper - -LOGGING = logging.getLogger(__name__) +import time +from six import moves +from yardstick.common import utils +from yardstick.network_services.vnf_generic.vnf import sample_vnf +from yardstick.network_services.vnf_generic.vnf import tg_trex +from trex_stl_lib.trex_stl_exceptions import STLError -class TrexRfc2544ResourceHelper(Rfc2544ResourceHelper): - def is_done(self): - return self.latency and self.iteration.value > 10 +LOG = logging.getLogger(__name__) -class TrexRfcResourceHelper(TrexResourceHelper): +class TrexRfcResourceHelper(tg_trex.TrexResourceHelper): - LATENCY_TIME_SLEEP = 120 - RUN_DURATION = 30 - WAIT_TIME = 3 + SAMPLING_PERIOD = 2 + TRANSIENT_PERIOD = 10 - def __init__(self, setup_helper, rfc_helper_type=None): + def __init__(self, setup_helper): super(TrexRfcResourceHelper, self).__init__(setup_helper) - - if rfc_helper_type is None: - rfc_helper_type = TrexRfc2544ResourceHelper - - self.rfc2544_helper = rfc_helper_type(self.scenario_helper) + self.rfc2544_helper = sample_vnf.Rfc2544ResourceHelper( + self.scenario_helper) def _run_traffic_once(self, traffic_profile): - if self._terminated.value: - return - - traffic_profile.execute_traffic(self) self.client_started.value = 1 - time.sleep(self.RUN_DURATION) - self.client.stop(traffic_profile.ports) - time.sleep(self.WAIT_TIME) - samples = traffic_profile.get_drop_percentage(self) - self._queue.put(samples) - - if not self.rfc2544_helper.is_done(): - return - - self.client.stop(traffic_profile.ports) - self.client.reset(ports=traffic_profile.ports) - self.client.remove_all_streams(traffic_profile.ports) - traffic_profile.execute_traffic_latency(samples=samples) - multiplier = traffic_profile.calculate_pps(samples)[1] - for _ in range(5): - time.sleep(self.LATENCY_TIME_SLEEP) - self.client.stop(traffic_profile.ports) - time.sleep(self.WAIT_TIME) - last_res = self.client.get_stats(traffic_profile.ports) - if not isinstance(last_res, Mapping): - self._terminated.value = 1 - continue - self.generate_samples(traffic_profile.ports, 'latency', {}) - self._queue.put(samples) - self.client.start(mult=str(multiplier), - ports=traffic_profile.ports, - duration=120, force=True) + ports, port_pg_id = traffic_profile.execute_traffic(self) + + samples = [] + timeout = int(traffic_profile.config.duration) - self.TRANSIENT_PERIOD + time.sleep(self.TRANSIENT_PERIOD) + for _ in utils.Timer(timeout=timeout): + samples.append(self._get_samples(ports, port_pg_id=port_pg_id)) + time.sleep(self.SAMPLING_PERIOD) + + traffic_profile.stop_traffic(self) + completed, output = traffic_profile.get_drop_percentage( + samples, self.rfc2544_helper.tolerance_low, + self.rfc2544_helper.tolerance_high, + self.rfc2544_helper.correlated_traffic, + self.rfc2544_helper.resolution) + self._queue.put(output) + return completed def start_client(self, ports, mult=None, duration=None, force=True): self.client.start(ports=ports, mult=mult, duration=duration, force=force) @@ -86,12 +61,58 @@ class TrexRfcResourceHelper(TrexResourceHelper): def clear_client_stats(self, ports): self.client.clear_stats(ports=ports) - def collect_kpi(self): - self.rfc2544_helper.iteration.value += 1 - return super(TrexRfcResourceHelper, self).collect_kpi() - - -class TrexTrafficGenRFC(TrexTrafficGen): + def run_test(self, traffic_profile, tasks_queue, results_queue, *args): # pragma: no cover + LOG.debug("Trex resource_helper run_test") + if self._terminated.value: + return + # if we don't do this we can hang waiting for the queue to drain + # have to do this in the subprocess + self._queue.cancel_join_thread() + try: + self._build_ports() + self.client = self._connect() + self.client.reset(ports=self.all_ports) + self.client.remove_all_streams(self.all_ports) # remove all streams + traffic_profile.register_generator(self) + + completed = False + self.rfc2544_helper.iteration.value = 0 + self.client_started.value = 1 + while completed is False and not self._terminated.value: + LOG.debug("Wait for task ...") + try: + task = tasks_queue.get(True, 5) + except moves.queue.Empty: + LOG.debug("Wait for task timeout, continue waiting...") + continue + else: + if task != 'RUN_TRAFFIC': + continue + self.rfc2544_helper.iteration.value += 1 + LOG.info("Got %s task, start iteration %d", task, + self.rfc2544_helper.iteration.value) + completed = self._run_traffic_once(traffic_profile) + if completed: + LOG.debug("%s::run_test - test completed", + self.__class__.__name__) + results_queue.put('COMPLETE') + else: + results_queue.put('CONTINUE') + tasks_queue.task_done() + + self.client.stop(self.all_ports) + self.client.disconnect() + self._terminated.value = 0 + except STLError: + if self._terminated.value: + LOG.debug("traffic generator is stopped") + return # return if trex/tg server is stopped. + raise + + self.client_started.value = 0 + LOG.debug("%s::run_test done", self.__class__.__name__) + +class TrexTrafficGenRFC(tg_trex.TrexTrafficGen): """ This class handles mapping traffic profile and generating traffic for rfc2544 testcase. diff --git a/yardstick/network_services/vnf_generic/vnf/tg_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_trex.py index 0084a124c..0cb66a714 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_trex.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_trex.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,9 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -""" Trex acts as traffic generation and vnf definitions based on IETS Spec """ -from __future__ import absolute_import +import datetime import logging import os @@ -25,6 +24,7 @@ from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTraff from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper + LOG = logging.getLogger(__name__) @@ -165,6 +165,34 @@ class TrexResourceHelper(ClientResourceHelper): cmd = "sudo fuser -n tcp %s %s -k > /dev/null 2>&1" self.ssh_helper.execute(cmd % (self.SYNC_PORT, self.ASYNC_PORT)) + def _get_samples(self, ports, port_pg_id=None): + stats = self.get_stats(ports) + timestamp = datetime.datetime.now() + samples = {} + for pname in (intf['name'] for intf in self.vnfd_helper.interfaces): + port_num = self.vnfd_helper.port_num(pname) + port_stats = stats.get(port_num, {}) + samples[pname] = { + 'rx_throughput_fps': float(port_stats.get('rx_pps', 0.0)), + 'tx_throughput_fps': float(port_stats.get('tx_pps', 0.0)), + 'rx_throughput_bps': float(port_stats.get('rx_bps', 0.0)), + 'tx_throughput_bps': float(port_stats.get('tx_bps', 0.0)), + 'in_packets': int(port_stats.get('ipackets', 0)), + 'out_packets': int(port_stats.get('opackets', 0)), + 'in_bytes': int(port_stats.get('ibytes', 0)), + 'out_bytes': int(port_stats.get('obytes', 0)), + 'timestamp': timestamp + } + + pg_id_list = port_pg_id.get_pg_ids(port_num) + samples[pname]['latency'] = {} + for pg_id in pg_id_list: + latency_global = stats.get('latency', {}) + pg_latency = latency_global.get(pg_id, {}).get('latency') + samples[pname]['latency'][pg_id] = pg_latency + + return samples + class TrexTrafficGen(SampleVNFTrafficGen): """ diff --git a/yardstick/network_services/vnf_generic/vnf/tg_trex_vpp.py b/yardstick/network_services/vnf_generic/vnf/tg_trex_vpp.py new file mode 100644 index 000000000..846304880 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/tg_trex_vpp.py @@ -0,0 +1,178 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from trex_stl_lib.trex_stl_exceptions import STLError + +from yardstick.common.utils import safe_cast +from yardstick.network_services.vnf_generic.vnf.sample_vnf import \ + Rfc2544ResourceHelper +from yardstick.network_services.vnf_generic.vnf.sample_vnf import \ + SampleVNFTrafficGen +from yardstick.network_services.vnf_generic.vnf.tg_trex import \ + TrexDpdkVnfSetupEnvHelper +from yardstick.network_services.vnf_generic.vnf.tg_trex import \ + TrexResourceHelper + +LOGGING = logging.getLogger(__name__) + + +class TrexVppResourceHelper(TrexResourceHelper): + + def __init__(self, setup_helper, rfc_helper_type=None): + super(TrexVppResourceHelper, self).__init__(setup_helper) + + if rfc_helper_type is None: + rfc_helper_type = Rfc2544ResourceHelper + + self.rfc2544_helper = rfc_helper_type(self.scenario_helper) + + self.loss = None + self.sent = None + self.latency = None + + def generate_samples(self, stats=None, ports=None, port_pg_id=None, + latency=False): + samples = {} + if stats is None: + stats = self.get_stats(ports) + for pname in (intf['name'] for intf in self.vnfd_helper.interfaces): + port_num = self.vnfd_helper.port_num(pname) + port_stats = stats.get(port_num, {}) + samples[pname] = { + 'rx_throughput_fps': float(port_stats.get('rx_pps', 0.0)), + 'tx_throughput_fps': float(port_stats.get('tx_pps', 0.0)), + 'rx_throughput_bps': float(port_stats.get('rx_bps', 0.0)), + 'tx_throughput_bps': float(port_stats.get('tx_bps', 0.0)), + 'in_packets': int(port_stats.get('ipackets', 0)), + 'out_packets': int(port_stats.get('opackets', 0)), + } + + if latency: + pg_id_list = port_pg_id.get_pg_ids(port_num) + samples[pname]['latency'] = {} + for pg_id in pg_id_list: + latency_global = stats.get('latency', {}) + pg_latency = latency_global.get(pg_id, {}).get('latency') + + t_min = safe_cast(pg_latency.get("total_min", 0.0), float, + -1.0) + t_avg = safe_cast(pg_latency.get("average", 0.0), float, + -1.0) + t_max = safe_cast(pg_latency.get("total_max", 0.0), float, + -1.0) + + latency = { + "min_latency": t_min, + "max_latency": t_max, + "avg_latency": t_avg, + } + samples[pname]['latency'][pg_id] = latency + + return samples + + def _run_traffic_once(self, traffic_profile): + self.client_started.value = 1 + traffic_profile.execute_traffic(self) + return True + + def run_traffic(self, traffic_profile): + self._queue.cancel_join_thread() + traffic_profile.init_queue(self._queue) + super(TrexVppResourceHelper, self).run_traffic(traffic_profile) + + @staticmethod + def fmt_latency(lat_min, lat_avg, lat_max): + t_min = int(round(safe_cast(lat_min, float, -1.0))) + t_avg = int(round(safe_cast(lat_avg, float, -1.0))) + t_max = int(round(safe_cast(lat_max, float, -1.0))) + + return "/".join(str(tmp) for tmp in (t_min, t_avg, t_max)) + + def send_traffic_on_tg(self, ports, port_pg_id, duration, rate, + latency=False): + try: + # Choose rate and start traffic: + self.client.start(ports=ports, mult=rate, duration=duration) + # Block until done: + try: + self.client.wait_on_traffic(ports=ports, timeout=duration + 20) + except STLError as err: + self.client.stop(ports) + LOGGING.error("TRex stateless timeout error: %s", err) + + if self.client.get_warnings(): + for warning in self.client.get_warnings(): + LOGGING.warning(warning) + + # Read the stats after the test + stats = self.client.get_stats() + + packets_in = [] + packets_out = [] + for port in ports: + packets_in.append(stats[port]["ipackets"]) + packets_out.append(stats[port]["opackets"]) + + if latency: + self.latency = [] + pg_id_list = port_pg_id.get_pg_ids(port) + for pg_id in pg_id_list: + latency_global = stats.get('latency', {}) + pg_latency = latency_global.get(pg_id, {}).get( + 'latency') + lat = self.fmt_latency( + str(pg_latency.get("total_min")), + str(pg_latency.get("average")), + str(pg_latency.get("total_max"))) + LOGGING.info( + "latencyStream%s(usec)=%s", pg_id, lat) + self.latency.append(lat) + + self.sent = sum(packets_out) + total_rcvd = sum(packets_in) + self.loss = self.sent - total_rcvd + LOGGING.info("rate=%s, totalReceived=%s, totalSent=%s," + " frameLoss=%s", rate, total_rcvd, self.sent, + self.loss) + return stats + except STLError as err: + LOGGING.error("TRex stateless runtime error: %s", err) + raise RuntimeError('TRex stateless runtime error') + + +class TrexTrafficGenVpp(SampleVNFTrafficGen): + APP_NAME = 'TRex' + WAIT_TIME = 20 + + def __init__(self, name, vnfd, setup_env_helper_type=None, + resource_helper_type=None): + if setup_env_helper_type is None: + setup_env_helper_type = TrexDpdkVnfSetupEnvHelper + if resource_helper_type is None: + resource_helper_type = TrexVppResourceHelper + + super(TrexTrafficGenVpp, self).__init__( + name, vnfd, setup_env_helper_type, resource_helper_type) + + def _check_status(self): + return self.resource_helper.check_status() + + def _start_server(self): + super(TrexTrafficGenVpp, self)._start_server() + self.resource_helper.start() + + def wait_for_instantiate(self): + return self._wait_for_process() diff --git a/yardstick/network_services/vnf_generic/vnf/tg_vcmts_pktgen.py b/yardstick/network_services/vnf_generic/vnf/tg_vcmts_pktgen.py new file mode 100755 index 000000000..c6df9d04c --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/tg_vcmts_pktgen.py @@ -0,0 +1,215 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +import socket +import yaml +import os + +from yardstick.network_services.vnf_generic.vnf import sample_vnf +from yardstick.common import exceptions + + +LOG = logging.getLogger(__name__) + + +class PktgenHelper(object): + + RETRY_SECONDS = 0.5 + RETRY_COUNT = 20 + CONNECT_TIMEOUT = 5 + + def __init__(self, host, port=23000): + self.host = host + self.port = port + self.connected = False + + def _connect(self): + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + ret = True + try: + self._sock.settimeout(self.CONNECT_TIMEOUT) + self._sock.connect((self.host, self.port)) + except (socket.gaierror, socket.error, socket.timeout): + self._sock.close() + ret = False + + return ret + + def connect(self): + if self.connected: + return True + LOG.info("Connecting to pktgen instance at %s...", self.host) + for idx in range(self.RETRY_COUNT): + self.connected = self._connect() + if self.connected: + return True + LOG.debug("Connection attempt %d: Unable to connect to %s, " \ + "retrying in %d seconds", + idx, self.host, self.RETRY_SECONDS) + time.sleep(self.RETRY_SECONDS) + + LOG.error("Unable to connect to pktgen instance on %s !", + self.host) + return False + + + def send_command(self, command): + if not self.connected: + LOG.error("Pktgen socket is not connected") + return False + + try: + self._sock.sendall((command + "\n").encode()) + time.sleep(1) + except (socket.timeout, socket.error): + LOG.error("Error sending command '%s'", command) + return False + + return True + + +class VcmtsPktgenSetupEnvHelper(sample_vnf.SetupEnvHelper): + + BASE_PARAMETERS = "export LUA_PATH=/vcmts/Pktgen.lua;"\ + + "export CMK_PROC_FS=/host/proc;" + + PORTS_COUNT = 8 + + def generate_pcap_filename(self, port_cfg): + return port_cfg['traffic_type'] + "_" + port_cfg['num_subs'] \ + + "cms_" + port_cfg['num_ofdm'] + "ofdm.pcap" + + def find_port_cfg(self, ports_cfg, port_name): + for port_cfg in ports_cfg: + if port_name in port_cfg: + return port_cfg + return None + + def build_pktgen_parameters(self, pod_cfg): + ports_cfg = pod_cfg['ports'] + port_cfg = list() + + for i in range(self.PORTS_COUNT): + port_cfg.append(self.find_port_cfg(ports_cfg, 'port_' + str(i))) + + pktgen_parameters = self.BASE_PARAMETERS + " " \ + + " /pktgen-config/setup.sh " + pod_cfg['pktgen_id'] \ + + " " + pod_cfg['num_ports'] + + for i in range(self.PORTS_COUNT): + pktgen_parameters += " " + port_cfg[i]['net_pktgen'] + + for i in range(self.PORTS_COUNT): + pktgen_parameters += " " + self.generate_pcap_filename(port_cfg[i]) + + return pktgen_parameters + + def start_pktgen(self, pod_cfg): + self.ssh_helper.drop_connection() + cmd = self.build_pktgen_parameters(pod_cfg) + LOG.debug("Executing: '%s'", cmd) + self.ssh_helper.send_command(cmd) + LOG.info("Pktgen executed") + + def setup_vnf_environment(self): + pass + + +class VcmtsPktgen(sample_vnf.SampleVNFTrafficGen): + + TG_NAME = 'VcmtsPktgen' + APP_NAME = 'VcmtsPktgen' + RUN_WAIT = 4 + DEFAULT_RATE = 8.0 + + PKTGEN_BASE_PORT = 23000 + + def __init__(self, name, vnfd, setup_env_helper_type=None, + resource_helper_type=None): + if setup_env_helper_type is None: + setup_env_helper_type = VcmtsPktgenSetupEnvHelper + super(VcmtsPktgen, self).__init__( + name, vnfd, setup_env_helper_type, resource_helper_type) + + self.pktgen_address = vnfd['mgmt-interface']['ip'] + LOG.info("Pktgen container '%s', IP: %s", name, self.pktgen_address) + + def extract_pod_cfg(self, pktgen_pods_cfg, pktgen_id): + for pod_cfg in pktgen_pods_cfg: + if pod_cfg['pktgen_id'] == pktgen_id: + return pod_cfg + return None + + def instantiate(self, scenario_cfg, context_cfg): + super(VcmtsPktgen, self).instantiate(scenario_cfg, context_cfg) + self._start_server() + options = scenario_cfg.get('options', {}) + self.pktgen_rate = options.get('pktgen_rate', self.DEFAULT_RATE) + + try: + pktgen_values_filepath = options['pktgen_values'] + except KeyError: + raise KeyError("Missing pktgen_values key in scenario options" \ + "section of the task definition file") + + if not os.path.isfile(pktgen_values_filepath): + raise RuntimeError("The pktgen_values file path provided " \ + "does not exists") + + # The yaml_loader.py (SafeLoader) underlying regex has an issue + # with reading PCI addresses (processed as double). so the + # BaseLoader is used here. + with open(pktgen_values_filepath) as stream: + pktgen_values = yaml.load(stream, Loader=yaml.BaseLoader) + + if pktgen_values == None: + raise RuntimeError("Error reading pktgen_values file provided (" + + pktgen_values_filepath + ")") + + self.pktgen_id = int(options[self.name]['pktgen_id']) + self.resource_helper.pktgen_id = self.pktgen_id + + self.pktgen_helper = PktgenHelper(self.pktgen_address, + self.PKTGEN_BASE_PORT + self.pktgen_id) + + pktgen_pods_cfg = pktgen_values['topology']['pktgen_pods'] + + self.pod_cfg = self.extract_pod_cfg(pktgen_pods_cfg, + str(self.pktgen_id)) + + if self.pod_cfg == None: + raise KeyError("Pktgen with id " + str(self.pktgen_id) + \ + " was not found") + + self.setup_helper.start_pktgen(self.pod_cfg) + + def run_traffic(self, traffic_profile): + if not self.pktgen_helper.connect(): + raise exceptions.PktgenActionError(command="connect") + LOG.info("Connected to pktgen instance at %s", self.pktgen_address) + + commands = [] + for i in range(self.setup_helper.PORTS_COUNT): + commands.append('pktgen.set("' + str(i) + '", "rate", ' + + "%0.1f" % self.pktgen_rate + ');') + + commands.append('pktgen.start("all");') + + for command in commands: + if self.pktgen_helper.send_command(command): + LOG.debug("Command '%s' sent to pktgen", command) + LOG.info("Traffic started on %s...", self.name) + return True diff --git a/yardstick/network_services/vnf_generic/vnf/udp_replay.py b/yardstick/network_services/vnf_generic/vnf/udp_replay.py index a57f53bc7..a3b0b9fd9 100644 --- a/yardstick/network_services/vnf_generic/vnf/udp_replay.py +++ b/yardstick/network_services/vnf_generic/vnf/udp_replay.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,7 +19,7 @@ from yardstick.common.process import check_if_process_failed from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper - +from yardstick.benchmark.contexts import base as ctx_base LOG = logging.getLogger(__name__) @@ -79,9 +79,11 @@ class UdpReplayApproxVnf(SampleVNF): ports_mask_hex = hex(sum(2 ** num for num in port_nums)) # one core extra for master cpu_mask_hex = hex(2 ** (number_of_ports + 1) - 1) + nfvi_context = ctx_base.Context.get_context_from_server( + self.scenario_helper.nodes[self.name]) hw_csum = "" if (not self.scenario_helper.options.get('hw_csum', False) or - self.nfvi_context.attrs.get('nfvi_type') not in self.HW_OFFLOADING_NFVI_TYPES): + nfvi_context.attrs.get('nfvi_type') not in self.HW_OFFLOADING_NFVI_TYPES): hw_csum = '--no-hw-csum' # tuples of (FLD_PORT, FLD_QUEUE, FLD_LCORE) @@ -107,7 +109,7 @@ class UdpReplayApproxVnf(SampleVNF): def collect_kpi(self): def get_sum(offset): - return sum(int(i) for i in split_stats[offset::5]) + return sum(int(i) for i in split_stats[offset::6]) # we can't get KPIs if the VNF is down check_if_process_failed(self._vnf_process) @@ -115,8 +117,13 @@ class UdpReplayApproxVnf(SampleVNF): stats = self.get_stats() stats_words = stats.split() - split_stats = stats_words[stats_words.index('0'):][:number_of_ports * 5] + split_stats = stats_words[stats_words.index('arp_pkts') + 1:][:number_of_ports * 6] + + physical_node = ctx_base.Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + result = { + "physical_node": physical_node, "packets_in": get_sum(1), "packets_fwd": get_sum(2), "packets_dropped": get_sum(3) + get_sum(4), diff --git a/yardstick/network_services/vnf_generic/vnf/vcmts_vnf.py b/yardstick/network_services/vnf_generic/vnf/vcmts_vnf.py new file mode 100755 index 000000000..0b48ef4e9 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/vcmts_vnf.py @@ -0,0 +1,273 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import yaml + +from influxdb import InfluxDBClient + +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SetupEnvHelper +from yardstick.common import constants +from yardstick.common import exceptions +from yardstick.network_services.vnf_generic.vnf.base import GenericVNF +from yardstick.network_services.vnf_generic.vnf.sample_vnf import ScenarioHelper +from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper +from yardstick.network_services.utils import get_nsb_option + + +LOG = logging.getLogger(__name__) + + +class InfluxDBHelper(object): + + INITIAL_VALUE = 'now() - 1m' + + def __init__(self, vcmts_influxdb_ip, vcmts_influxdb_port): + self._vcmts_influxdb_ip = vcmts_influxdb_ip + self._vcmts_influxdb_port = vcmts_influxdb_port + self._last_upstream_rx = self.INITIAL_VALUE + self._last_values_time = dict() + + def start(self): + self._read_client = InfluxDBClient(host=self._vcmts_influxdb_ip, + port=self._vcmts_influxdb_port, + database='collectd') + self._write_client = InfluxDBClient(host=constants.INFLUXDB_IP, + port=constants.INFLUXDB_PORT, + database='collectd') + + def _get_last_value_time(self, measurement): + if measurement in self._last_values_time: + return self._last_values_time[measurement] + return self.INITIAL_VALUE + + def _set_last_value_time(self, measurement, time): + self._last_values_time[measurement] = "'" + time + "'" + + def _query_measurement(self, measurement): + # There is a delay before influxdb flushes the data + query = "SELECT * FROM " + measurement + " WHERE time > " \ + + self._get_last_value_time(measurement) \ + + " ORDER BY time ASC;" + query_result = self._read_client.query(query) + if len(query_result.keys()) == 0: + return None + return query_result.get_points(measurement) + + def _rw_measurment(self, measurement, columns): + query_result = self._query_measurement(measurement) + if query_result == None: + return + + points_to_write = list() + for entry in query_result: + point = { + "measurement": measurement, + "tags": { + "type": entry['type'], + "host": entry['host'] + }, + "time": entry['time'], + "fields": {} + } + + for column in columns: + if column == 'value': + point["fields"][column] = float(entry[column]) + else: + point["fields"][column] = entry[column] + + points_to_write.append(point) + self._set_last_value_time(measurement, entry['time']) + + # Write the points to yardstick database + if self._write_client.write_points(points_to_write): + LOG.debug("%d new points written to '%s' measurement", + len(points_to_write), measurement) + + def copy_kpi(self): + self._rw_measurment("cpu_value", ["instance", "type_instance", "value"]) + self._rw_measurment("cpufreq_value", ["type_instance", "value"]) + self._rw_measurment("downstream_rx", ["value"]) + self._rw_measurment("downstream_tx", ["value"]) + self._rw_measurment("downstream_value", ["value"]) + self._rw_measurment("ds_per_cm_value", ["instance", "value"]) + self._rw_measurment("intel_rdt_value", ["instance", "type_instance", "value"]) + self._rw_measurment("turbostat_value", ["instance", "type_instance", "value"]) + self._rw_measurment("upstream_rx", ["value"]) + self._rw_measurment("upstream_tx", ["value"]) + self._rw_measurment("upstream_value", ["value"]) + + +class VcmtsdSetupEnvHelper(SetupEnvHelper): + + BASE_PARAMETERS = "export LD_LIBRARY_PATH=/opt/collectd/lib:;"\ + + "export CMK_PROC_FS=/host/proc;" + + def build_us_parameters(self, pod_cfg): + return self.BASE_PARAMETERS + " " \ + + " /opt/bin/cmk isolate --conf-dir=/etc/cmk" \ + + " --socket-id=" + pod_cfg['cpu_socket_id'] \ + + " --pool=shared" \ + + " /vcmts-config/run_upstream.sh " + pod_cfg['sg_id'] \ + + " " + pod_cfg['ds_core_type'] \ + + " " + pod_cfg['num_ofdm'] + "ofdm" \ + + " " + pod_cfg['num_subs'] + "cm" \ + + " " + pod_cfg['cm_crypto'] \ + + " " + pod_cfg['qat'] \ + + " " + pod_cfg['net_us'] \ + + " " + pod_cfg['power_mgmt'] + + def build_ds_parameters(self, pod_cfg): + return self.BASE_PARAMETERS + " " \ + + " /opt/bin/cmk isolate --conf-dir=/etc/cmk" \ + + " --socket-id=" + pod_cfg['cpu_socket_id'] \ + + " --pool=" + pod_cfg['ds_core_type'] \ + + " /vcmts-config/run_downstream.sh " + pod_cfg['sg_id'] \ + + " " + pod_cfg['ds_core_type'] \ + + " " + pod_cfg['ds_core_pool_index'] \ + + " " + pod_cfg['num_ofdm'] + "ofdm" \ + + " " + pod_cfg['num_subs'] + "cm" \ + + " " + pod_cfg['cm_crypto'] \ + + " " + pod_cfg['qat'] \ + + " " + pod_cfg['net_ds'] \ + + " " + pod_cfg['power_mgmt'] + + def build_cmd(self, stream_dir, pod_cfg): + if stream_dir == 'ds': + return self.build_ds_parameters(pod_cfg) + else: + return self.build_us_parameters(pod_cfg) + + def run_vcmtsd(self, stream_dir, pod_cfg): + cmd = self.build_cmd(stream_dir, pod_cfg) + LOG.debug("Executing %s", cmd) + self.ssh_helper.send_command(cmd) + + def setup_vnf_environment(self): + pass + + +class VcmtsVNF(GenericVNF): + + RUN_WAIT = 4 + + def __init__(self, name, vnfd): + super(VcmtsVNF, self).__init__(name, vnfd) + self.name = name + self.bin_path = get_nsb_option('bin_path', '') + self.scenario_helper = ScenarioHelper(self.name) + self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path) + + self.setup_helper = VcmtsdSetupEnvHelper(self.vnfd_helper, + self.ssh_helper, + self.scenario_helper) + + def extract_pod_cfg(self, vcmts_pods_cfg, sg_id): + for pod_cfg in vcmts_pods_cfg: + if pod_cfg['sg_id'] == sg_id: + return pod_cfg + + def instantiate(self, scenario_cfg, context_cfg): + self._update_collectd_options(scenario_cfg, context_cfg) + self.scenario_helper.scenario_cfg = scenario_cfg + self.context_cfg = context_cfg + + options = scenario_cfg.get('options', {}) + + try: + self.vcmts_influxdb_ip = options['vcmts_influxdb_ip'] + self.vcmts_influxdb_port = options['vcmts_influxdb_port'] + except KeyError: + raise KeyError("Missing destination InfluxDB details in scenario" \ + " section of the task definition file") + + try: + vcmtsd_values_filepath = options['vcmtsd_values'] + except KeyError: + raise KeyError("Missing vcmtsd_values key in scenario options" \ + "section of the task definition file") + + if not os.path.isfile(vcmtsd_values_filepath): + raise RuntimeError("The vcmtsd_values file path provided " \ + "does not exists") + + # The yaml_loader.py (SafeLoader) underlying regex has an issue + # with reading PCI addresses (processed as double). so the + # BaseLoader is used here. + with open(vcmtsd_values_filepath) as stream: + vcmtsd_values = yaml.load(stream, Loader=yaml.BaseLoader) + + if vcmtsd_values == None: + raise RuntimeError("Error reading vcmtsd_values file provided (" + + vcmtsd_values_filepath + ")") + + vnf_options = options.get(self.name, {}) + sg_id = str(vnf_options['sg_id']) + stream_dir = vnf_options['stream_dir'] + + try: + vcmts_pods_cfg = vcmtsd_values['topology']['vcmts_pods'] + except KeyError: + raise KeyError("Missing vcmts_pods key in the " \ + "vcmtsd_values file provided") + + pod_cfg = self.extract_pod_cfg(vcmts_pods_cfg, sg_id) + if pod_cfg == None: + raise exceptions.IncorrectConfig(error_msg="Service group " + sg_id + " not found") + + self.setup_helper.run_vcmtsd(stream_dir, pod_cfg) + + def _update_collectd_options(self, scenario_cfg, context_cfg): + scenario_options = scenario_cfg.get('options', {}) + generic_options = scenario_options.get('collectd', {}) + scenario_node_options = scenario_options.get(self.name, {})\ + .get('collectd', {}) + context_node_options = context_cfg.get('nodes', {})\ + .get(self.name, {}).get('collectd', {}) + + options = generic_options + self._update_options(options, scenario_node_options) + self._update_options(options, context_node_options) + + self.setup_helper.collectd_options = options + + def _update_options(self, options, additional_options): + for k, v in additional_options.items(): + if isinstance(v, dict) and k in options: + options[k].update(v) + else: + options[k] = v + + def wait_for_instantiate(self): + pass + + def terminate(self): + pass + + def scale(self, flavor=""): + pass + + def collect_kpi(self): + self.influxdb_helper.copy_kpi() + return {"n/a": "n/a"} + + def start_collect(self): + self.influxdb_helper = InfluxDBHelper(self.vcmts_influxdb_ip, + self.vcmts_influxdb_port) + self.influxdb_helper.start() + + def stop_collect(self): + pass diff --git a/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py b/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py index 6c95648ce..743f2d4bb 100644 --- a/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,23 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import absolute_import import logging -from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file -from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper -from yardstick.network_services.yang_model import YangModel +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF +from yardstick.network_services.vnf_generic.vnf.acl_vnf import AclApproxSetupEnvSetupEnvHelper LOG = logging.getLogger(__name__) # vFW should work the same on all systems, we can provide the binary -FW_PIPELINE_COMMAND = """sudo {tool_path} -p {port_mask_hex} -f {cfg_file} -s {script}""" +FW_PIPELINE_COMMAND = "sudo {tool_path} -p {port_mask_hex} -f {cfg_file} -s {script} {hwlb}" FW_COLLECT_KPI = (r"""VFW TOTAL:[^p]+pkts_received"?:\s(\d+),[^p]+pkts_fw_forwarded"?:\s(\d+),""" r"""[^p]+pkts_drop_fw"?:\s(\d+),\s""") -class FWApproxSetupEnvHelper(DpdkVnfSetupEnvHelper): +class FWApproxSetupEnvHelper(AclApproxSetupEnvSetupEnvHelper): APP_NAME = "vFW" CFG_CONFIG = "/tmp/vfw_config" @@ -38,6 +36,8 @@ class FWApproxSetupEnvHelper(DpdkVnfSetupEnvHelper): SW_DEFAULT_CORE = 5 HW_DEFAULT_CORE = 2 VNF_TYPE = "VFW" + RULE_CMD = "vfw" + DEFAULT_FWD_ACTIONS = ["accept", "count", "conntrack"] class FWApproxVnf(SampleVNF): @@ -57,11 +57,7 @@ class FWApproxVnf(SampleVNF): setup_env_helper_type = FWApproxSetupEnvHelper super(FWApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) - self.vfw_rules = None - def _start_vnf(self): - yang_model_path = find_relative_file(self.scenario_helper.options['rules'], - self.scenario_helper.task_path) - yang_model = YangModel(yang_model_path) - self.vfw_rules = yang_model.get_rules() - super(FWApproxVnf, self)._start_vnf() + def wait_for_instantiate(self): + """Wait for VNF to initialize""" + self.wait_for_initialize() diff --git a/yardstick/network_services/vnf_generic/vnf/vims_vnf.py b/yardstick/network_services/vnf_generic/vnf/vims_vnf.py new file mode 100644 index 000000000..0e339b171 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/vims_vnf.py @@ -0,0 +1,105 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +from yardstick.network_services.vnf_generic.vnf import sample_vnf + +LOG = logging.getLogger(__name__) + + +class VimsSetupEnvHelper(sample_vnf.SetupEnvHelper): + + def setup_vnf_environment(self): + LOG.debug('VimsSetupEnvHelper:\n') + + +class VimsResourceHelper(sample_vnf.ClientResourceHelper): + pass + + +class VimsPcscfVnf(sample_vnf.SampleVNF): + + APP_NAME = "VimsPcscf" + APP_WORD = "VimsPcscf" + + def __init__(self, name, vnfd, setup_env_helper_type=None, + resource_helper_type=None): + if resource_helper_type is None: + resource_helper_type = VimsResourceHelper + if setup_env_helper_type is None: + setup_env_helper_type = VimsSetupEnvHelper + super(VimsPcscfVnf, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) + + def wait_for_instantiate(self): + pass + + def _run(self): + pass + + def start_collect(self): + # TODO + pass + + def collect_kpi(self): + # TODO + pass + + +class VimsHssVnf(sample_vnf.SampleVNF): + + APP_NAME = "VimsHss" + APP_WORD = "VimsHss" + CMD = "sudo /media/generate_user.sh {} {} >> /dev/null 2>&1" + + def __init__(self, name, vnfd, setup_env_helper_type=None, + resource_helper_type=None): + if resource_helper_type is None: + resource_helper_type = VimsResourceHelper + if setup_env_helper_type is None: + setup_env_helper_type = VimsSetupEnvHelper + super(VimsHssVnf, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) + self.start_user = 1 + self.end_user = 10000 + self.WAIT_TIME = 600 + + def instantiate(self, scenario_cfg, context_cfg): + LOG.debug("scenario_cfg=%s\n", scenario_cfg) + self.start_user = scenario_cfg.get("options", {}).get("start_user", self.start_user) + self.end_user = scenario_cfg.get("options", {}).get("end_user", self.end_user) + # TODO + # Need to check HSS services are ready before generating user accounts + # Now, adding time sleep that manually configured by user + # to wait for HSS services. + # Note: for heat, waiting time is too long (~ 600s) + self.WAIT_TIME = scenario_cfg.get("options", {}).get("wait_time", self.WAIT_TIME) + time.sleep(self.WAIT_TIME) + LOG.debug("Generate user accounts from %d to %d\n", + self.start_user, self.end_user) + cmd = self.CMD.format(self.start_user, self.end_user) + self.ssh_helper.execute(cmd, None, 3600, False) + + def wait_for_instantiate(self): + pass + + def start_collect(self): + # TODO + pass + + def collect_kpi(self): + # TODO + pass diff --git a/yardstick/network_services/vnf_generic/vnf/vnf_ssh_helper.py b/yardstick/network_services/vnf_generic/vnf/vnf_ssh_helper.py new file mode 100644 index 000000000..6c5c6c833 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/vnf_ssh_helper.py @@ -0,0 +1,62 @@ +# Copyright (c) 2016-2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os + +from six.moves import StringIO + +from yardstick.network_services import constants +from yardstick.ssh import AutoConnectSSH + +LOG = logging.getLogger(__name__) + + +class VnfSshHelper(AutoConnectSSH): + + def __init__(self, node, bin_path, wait=None): + self.node = node + kwargs = self.args_from_node(self.node) + if wait: + # if wait is defined here we want to override + kwargs['wait'] = wait + + super(VnfSshHelper, self).__init__(**kwargs) + self.bin_path = bin_path + + @staticmethod + def get_class(): + # must return static class name, anything else refers to the calling class + # i.e. the subclass, not the superclass + return VnfSshHelper + + def copy(self): + # this copy constructor is different from SSH classes, since it uses node + return self.get_class()(self.node, self.bin_path) + + def upload_config_file(self, prefix, content): + cfg_file = os.path.join(constants.REMOTE_TMP, prefix) + LOG.debug('Config file name: %s', cfg_file) + LOG.debug(content) + file_obj = StringIO(content) + self.put_file_obj(file_obj, cfg_file) + return cfg_file + + def join_bin_path(self, *args): + return os.path.join(self.bin_path, *args) + + def provision_tool(self, tool_path=None, tool_file=None): + if tool_path is None: + tool_path = self.bin_path + return super(VnfSshHelper, self).provision_tool(tool_path, tool_file) diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py index c02c0eb27..322ecd016 100644 --- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,40 +17,30 @@ from __future__ import absolute_import from __future__ import print_function -import os import logging import re import posixpath -from six.moves import configparser, zip - +from yardstick.common import utils from yardstick.common.process import check_if_process_failed from yardstick.network_services.helpers.samplevnf_helper import PortPairs from yardstick.network_services.pipeline import PipelineRules from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper +from yardstick.benchmark.contexts import base as ctx_base LOG = logging.getLogger(__name__) -VPE_PIPELINE_COMMAND = """sudo {tool_path} -p {port_mask_hex} -f {cfg_file} -s {script}""" +VPE_PIPELINE_COMMAND = "sudo {tool_path} -p {port_mask_hex} -f {cfg_file} -s {script} {hwlb}" VPE_COLLECT_KPI = """\ -Pkts in:\s(\d+)\r\n\ -\tPkts dropped by AH:\s(\d+)\r\n\ -\tPkts dropped by other:\s(\d+)\ +Pkts in:\\s(\\d+)\r\n\ +\tPkts dropped by AH:\\s(\\d+)\r\n\ +\tPkts dropped by other:\\s(\\d+)\ """ class ConfigCreate(object): - @staticmethod - def vpe_tmq(config, index): - tm_q = 'TM{0}'.format(index) - config.add_section(tm_q) - config.set(tm_q, 'burst_read', '24') - config.set(tm_q, 'burst_write', '32') - config.set(tm_q, 'cfg', '/tmp/full_tm_profile_10G.cfg') - return config - def __init__(self, vnfd_helper, socket): super(ConfigCreate, self).__init__() self.sw_q = -1 @@ -61,122 +51,8 @@ class ConfigCreate(object): self.downlink_ports = self.vnfd_helper.port_pairs.downlink_ports self.pipeline_per_port = 9 self.socket = socket + self._dpdk_port_to_link_id_map = None - def vpe_initialize(self, config): - config.add_section('EAL') - config.set('EAL', 'log_level', '0') - - config.add_section('PIPELINE0') - config.set('PIPELINE0', 'type', 'MASTER') - config.set('PIPELINE0', 'core', 's%sC0' % self.socket) - - config.add_section('MEMPOOL0') - config.set('MEMPOOL0', 'pool_size', '256K') - - config.add_section('MEMPOOL1') - config.set('MEMPOOL1', 'pool_size', '2M') - return config - - def vpe_rxq(self, config): - for port in self.downlink_ports: - new_section = 'RXQ{0}.0'.format(self.vnfd_helper.port_num(port)) - config.add_section(new_section) - config.set(new_section, 'mempool', 'MEMPOOL1') - - return config - - def get_sink_swq(self, parser, pipeline, k, index): - sink = "" - pktq = parser.get(pipeline, k) - if "SINK" in pktq: - self.sink_q += 1 - sink = " SINK{0}".format(self.sink_q) - if "TM" in pktq: - sink = " TM{0}".format(index) - pktq = "SWQ{0}{1}".format(self.sw_q, sink) - return pktq - - def vpe_upstream(self, vnf_cfg, index=0): - parser = configparser.ConfigParser() - parser.read(os.path.join(vnf_cfg, 'vpe_upstream')) - - for pipeline in parser.sections(): - for k, v in parser.items(pipeline): - if k == "pktq_in": - if "RXQ" in v: - port = self.vnfd_helper.port_num(self.uplink_ports[index]) - value = "RXQ{0}.0".format(port) - else: - value = self.get_sink_swq(parser, pipeline, k, index) - - parser.set(pipeline, k, value) - - elif k == "pktq_out": - if "TXQ" in v: - port = self.vnfd_helper.port_num(self.downlink_ports[index]) - value = "TXQ{0}.0".format(port) - else: - self.sw_q += 1 - value = self.get_sink_swq(parser, pipeline, k, index) - - parser.set(pipeline, k, value) - - new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline) - if new_pipeline != pipeline: - parser._sections[new_pipeline] = parser._sections[pipeline] - parser._sections.pop(pipeline) - self.n_pipeline += 1 - return parser - - def vpe_downstream(self, vnf_cfg, index): - parser = configparser.ConfigParser() - parser.read(os.path.join(vnf_cfg, 'vpe_downstream')) - for pipeline in parser.sections(): - for k, v in parser.items(pipeline): - - if k == "pktq_in": - port = self.vnfd_helper.port_num(self.downlink_ports[index]) - if "RXQ" not in v: - value = self.get_sink_swq(parser, pipeline, k, index) - elif "TM" in v: - value = "RXQ{0}.0 TM{1}".format(port, index) - else: - value = "RXQ{0}.0".format(port) - - parser.set(pipeline, k, value) - - if k == "pktq_out": - port = self.vnfd_helper.port_num(self.uplink_ports[index]) - if "TXQ" not in v: - self.sw_q += 1 - value = self.get_sink_swq(parser, pipeline, k, index) - elif "TM" in v: - value = "TXQ{0}.0 TM{1}".format(port, index) - else: - value = "TXQ{0}.0".format(port) - - parser.set(pipeline, k, value) - - new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline) - if new_pipeline != pipeline: - parser._sections[new_pipeline] = parser._sections[pipeline] - parser._sections.pop(pipeline) - self.n_pipeline += 1 - return parser - - def create_vpe_config(self, vnf_cfg): - config = configparser.ConfigParser() - vpe_cfg = os.path.join("/tmp/vpe_config") - with open(vpe_cfg, 'w') as cfg_file: - config = self.vpe_initialize(config) - config = self.vpe_rxq(config) - config.write(cfg_file) - for index in range(0, len(self.uplink_ports)): - config = self.vpe_upstream(vnf_cfg, index) - config.write(cfg_file) - config = self.vpe_downstream(vnf_cfg, index) - config = self.vpe_tmq(config, index) - config.write(cfg_file) def generate_vpe_script(self, interfaces): rules = PipelineRules(pipeline_id=1) @@ -209,16 +85,10 @@ class ConfigCreate(object): return rules.get_string() - def generate_tm_cfg(self, vnf_cfg, index=0): - vnf_cfg = os.path.join(vnf_cfg, "full_tm_profile_10G.cfg") - if os.path.exists(vnf_cfg): - return open(vnf_cfg).read() - class VpeApproxSetupEnvHelper(DpdkVnfSetupEnvHelper): APP_NAME = 'vPE_vnf' - CFG_CONFIG = "/tmp/vpe_config" CFG_SCRIPT = "/tmp/vpe_script" TM_CONFIG = "/tmp/full_tm_profile_10G.cfg" CORES = ['0', '1', '2', '3', '4', '5'] @@ -231,33 +101,52 @@ class VpeApproxSetupEnvHelper(DpdkVnfSetupEnvHelper): self.all_ports = self._port_pairs.all_ports def build_config(self): + vnf_cfg = self.scenario_helper.vnf_cfg + task_path = self.scenario_helper.task_path + action_bulk_file = vnf_cfg.get('action_bulk_file', '/tmp/action_bulk_512.txt') + full_tm_profile_file = vnf_cfg.get('full_tm_profile_file', '/tmp/full_tm_profile_10G.cfg') + config_file = vnf_cfg.get('file', '/tmp/vpe_config') + script_file = vnf_cfg.get('script_file', None) vpe_vars = { "bin_path": self.ssh_helper.bin_path, "socket": self.socket, } - self._build_vnf_ports() vpe_conf = ConfigCreate(self.vnfd_helper, self.socket) - vpe_conf.create_vpe_config(self.scenario_helper.vnf_cfg) - config_basename = posixpath.basename(self.CFG_CONFIG) - script_basename = posixpath.basename(self.CFG_SCRIPT) - tm_basename = posixpath.basename(self.TM_CONFIG) - with open(self.CFG_CONFIG) as handle: + if script_file is None: + # autogenerate vpe_script if not given + vpe_script = vpe_conf.generate_vpe_script(self.vnfd_helper.interfaces) + script_file = self.CFG_SCRIPT + else: + with utils.open_relative_file(script_file, task_path) as handle: + vpe_script = handle.read() + + config_basename = posixpath.basename(config_file) + script_basename = posixpath.basename(script_file) + + with utils.open_relative_file(action_bulk_file, task_path) as handle: + action_bulk = handle.read() + + with utils.open_relative_file(full_tm_profile_file, task_path) as handle: + full_tm_profile = handle.read() + + with utils.open_relative_file(config_file, task_path) as handle: vpe_config = handle.read() + # upload the 4 config files to the target server self.ssh_helper.upload_config_file(config_basename, vpe_config.format(**vpe_vars)) - - vpe_script = vpe_conf.generate_vpe_script(self.vnfd_helper.interfaces) self.ssh_helper.upload_config_file(script_basename, vpe_script.format(**vpe_vars)) - - tm_config = vpe_conf.generate_tm_cfg(self.scenario_helper.vnf_cfg) - self.ssh_helper.upload_config_file(tm_basename, tm_config) + self.ssh_helper.upload_config_file(posixpath.basename(action_bulk_file), + action_bulk.format(**vpe_vars)) + self.ssh_helper.upload_config_file(posixpath.basename(full_tm_profile_file), + full_tm_profile.format(**vpe_vars)) LOG.info("Provision and start the %s", self.APP_NAME) - LOG.info(self.CFG_CONFIG) + LOG.info(config_file) LOG.info(self.CFG_SCRIPT) - self._build_pipeline_kwargs() + self._build_pipeline_kwargs(cfg_file='/tmp/' + config_basename, + script='/tmp/' + script_basename) return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs) @@ -281,7 +170,11 @@ class VpeApproxVnf(SampleVNF): def collect_kpi(self): # we can't get KPIs if the VNF is down check_if_process_failed(self._vnf_process) + physical_node = ctx_base.Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + result = { + "physical_node": physical_node, 'pkt_in_up_stream': 0, 'pkt_drop_up_stream': 0, 'pkt_in_down_stream': 0, diff --git a/yardstick/network_services/vnf_generic/vnf/vpp_helpers.py b/yardstick/network_services/vnf_generic/vnf/vpp_helpers.py new file mode 100644 index 000000000..fe8e7b2ba --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/vpp_helpers.py @@ -0,0 +1,751 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import binascii +import ipaddress +import json +import logging +import os +import re +import tempfile +import time +from collections import OrderedDict + +from yardstick.common import constants +from yardstick.common import exceptions +from yardstick.network_services.helpers.cpu import CpuSysCores +from yardstick.network_services.vnf_generic.vnf.sample_vnf import \ + DpdkVnfSetupEnvHelper + +LOG = logging.getLogger(__name__) + + +class VppConfigGenerator(object): + VPP_LOG_FILE = '/tmp/vpe.log' + + def __init__(self): + self._nodeconfig = {} + self._vpp_config = '' + + def add_config_item(self, config, value, path): + if len(path) == 1: + config[path[0]] = value + return + if path[0] not in config: + config[path[0]] = {} + elif isinstance(config[path[0]], str): + config[path[0]] = {} if config[path[0]] == '' \ + else {config[path[0]]: ''} + self.add_config_item(config[path[0]], value, path[1:]) + + def add_unix_log(self, value=None): + path = ['unix', 'log'] + if value is None: + value = self.VPP_LOG_FILE + self.add_config_item(self._nodeconfig, value, path) + + def add_unix_cli_listen(self, value='/run/vpp/cli.sock'): + path = ['unix', 'cli-listen'] + self.add_config_item(self._nodeconfig, value, path) + + def add_unix_nodaemon(self): + path = ['unix', 'nodaemon'] + self.add_config_item(self._nodeconfig, '', path) + + def add_unix_coredump(self): + path = ['unix', 'full-coredump'] + self.add_config_item(self._nodeconfig, '', path) + + def add_dpdk_dev(self, *devices): + for device in devices: + if VppConfigGenerator.pci_dev_check(device): + path = ['dpdk', 'dev {0}'.format(device)] + self.add_config_item(self._nodeconfig, '', path) + + def add_dpdk_cryptodev(self, count, cryptodev): + for i in range(count): + cryptodev_config = 'dev {0}'.format( + re.sub(r'\d.\d$', '1.' + str(i), cryptodev)) + path = ['dpdk', cryptodev_config] + self.add_config_item(self._nodeconfig, '', path) + self.add_dpdk_uio_driver('igb_uio') + + def add_dpdk_sw_cryptodev(self, sw_pmd_type, socket_id, count): + for _ in range(count): + cryptodev_config = 'vdev cryptodev_{0}_pmd,socket_id={1}'. \ + format(sw_pmd_type, str(socket_id)) + path = ['dpdk', cryptodev_config] + self.add_config_item(self._nodeconfig, '', path) + + def add_dpdk_dev_default_rxq(self, value): + path = ['dpdk', 'dev default', 'num-rx-queues'] + self.add_config_item(self._nodeconfig, value, path) + + def add_dpdk_dev_default_rxd(self, value): + path = ['dpdk', 'dev default', 'num-rx-desc'] + self.add_config_item(self._nodeconfig, value, path) + + def add_dpdk_dev_default_txd(self, value): + path = ['dpdk', 'dev default', 'num-tx-desc'] + self.add_config_item(self._nodeconfig, value, path) + + def add_dpdk_log_level(self, value): + path = ['dpdk', 'log-level'] + self.add_config_item(self._nodeconfig, value, path) + + def add_dpdk_socketmem(self, value): + path = ['dpdk', 'socket-mem'] + self.add_config_item(self._nodeconfig, value, path) + + def add_dpdk_num_mbufs(self, value): + path = ['dpdk', 'num-mbufs'] + self.add_config_item(self._nodeconfig, value, path) + + def add_dpdk_uio_driver(self, value=None): + path = ['dpdk', 'uio-driver'] + self.add_config_item(self._nodeconfig, value, path) + + def add_cpu_main_core(self, value): + path = ['cpu', 'main-core'] + self.add_config_item(self._nodeconfig, value, path) + + def add_cpu_corelist_workers(self, value): + path = ['cpu', 'corelist-workers'] + self.add_config_item(self._nodeconfig, value, path) + + def add_heapsize(self, value): + path = ['heapsize'] + self.add_config_item(self._nodeconfig, value, path) + + def add_ip6_hash_buckets(self, value): + path = ['ip6', 'hash-buckets'] + self.add_config_item(self._nodeconfig, value, path) + + def add_ip6_heap_size(self, value): + path = ['ip6', 'heap-size'] + self.add_config_item(self._nodeconfig, value, path) + + def add_ip_heap_size(self, value): + path = ['ip', 'heap-size'] + self.add_config_item(self._nodeconfig, value, path) + + def add_statseg_size(self, value): + path = ['statseg', 'size'] + self.add_config_item(self._nodeconfig, value, path) + + def add_plugin(self, state, *plugins): + for plugin in plugins: + path = ['plugins', 'plugin {0}'.format(plugin), state] + self.add_config_item(self._nodeconfig, ' ', path) + + def add_dpdk_no_multi_seg(self): + path = ['dpdk', 'no-multi-seg'] + self.add_config_item(self._nodeconfig, '', path) + + def add_dpdk_no_tx_checksum_offload(self): + path = ['dpdk', 'no-tx-checksum-offload'] + self.add_config_item(self._nodeconfig, '', path) + + def dump_config(self, obj=None, level=-1): + if obj is None: + obj = self._nodeconfig + obj = OrderedDict(sorted(obj.items())) + + indent = ' ' + if level >= 0: + self._vpp_config += '{}{{\n'.format(level * indent) + if isinstance(obj, dict): + for key, val in obj.items(): + if hasattr(val, '__iter__') and not isinstance(val, str): + self._vpp_config += '{}{}\n'.format((level + 1) * indent, + key) + self.dump_config(val, level + 1) + else: + self._vpp_config += '{}{} {}\n'.format( + (level + 1) * indent, + key, val) + if level >= 0: + self._vpp_config += '{}}}\n'.format(level * indent) + + return self._vpp_config + + @staticmethod + def pci_dev_check(pci_dev): + pattern = re.compile("^[0-9A-Fa-f]{4}:[0-9A-Fa-f]{2}:" + "[0-9A-Fa-f]{2}\\.[0-9A-Fa-f]$") + if not pattern.match(pci_dev): + raise ValueError('PCI address {addr} is not in valid format ' + 'xxxx:xx:xx.x'.format(addr=pci_dev)) + return True + + +class VppSetupEnvHelper(DpdkVnfSetupEnvHelper): + APP_NAME = "vpp" + CFG_CONFIG = "/etc/vpp/startup.conf" + CFG_SCRIPT = "" + PIPELINE_COMMAND = "" + QAT_DRIVER = "qat_dh895xcc" + VNF_TYPE = "IPSEC" + VAT_BIN_NAME = 'vpp_api_test' + + def __init__(self, vnfd_helper, ssh_helper, scenario_helper): + super(VppSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, + scenario_helper) + self.sys_cores = CpuSysCores(self.ssh_helper) + + def kill_vnf(self): + ret_code, _, _ = \ + self.ssh_helper.execute( + 'service {name} stop'.format(name=self.APP_NAME)) + if int(ret_code): + raise RuntimeError( + 'Failed to stop service {name}'.format(name=self.APP_NAME)) + + def tear_down(self): + pass + + def start_vpp_service(self): + ret_code, _, _ = \ + self.ssh_helper.execute( + 'service {name} restart'.format(name=self.APP_NAME)) + if int(ret_code): + raise RuntimeError( + 'Failed to start service {name}'.format(name=self.APP_NAME)) + + def _update_vnfd_helper(self, additional_data, iface_key=None): + for k, v in additional_data.items(): + if iface_key is None: + if isinstance(v, dict) and k in self.vnfd_helper: + self.vnfd_helper[k].update(v) + else: + self.vnfd_helper[k] = v + else: + if isinstance(v, + dict) and k in self.vnfd_helper.find_virtual_interface( + ifname=iface_key): + self.vnfd_helper.find_virtual_interface(ifname=iface_key)[ + k].update(v) + else: + self.vnfd_helper.find_virtual_interface(ifname=iface_key)[ + k] = v + + def get_value_by_interface_key(self, interface, key): + try: + return self.vnfd_helper.find_virtual_interface( + ifname=interface).get(key) + except (KeyError, ValueError): + return None + + def crypto_device_init(self, pci_addr, numvfs): + # QAT device must be re-bound to kernel driver before initialization. + self.dpdk_bind_helper.load_dpdk_driver(self.QAT_DRIVER) + + # Stop VPP to prevent deadlock. + self.kill_vnf() + + current_driver = self.get_pci_dev_driver(pci_addr.replace(':', r'\:')) + if current_driver is not None: + self.pci_driver_unbind(pci_addr) + + # Bind to kernel driver. + self.dpdk_bind_helper.bind(pci_addr, self.QAT_DRIVER.replace('qat_', '')) + + # Initialize QAT VFs. + if numvfs > 0: + self.set_sriov_numvfs(pci_addr, numvfs) + + def get_sriov_numvfs(self, pf_pci_addr): + command = 'cat /sys/bus/pci/devices/{pci}/sriov_numvfs'. \ + format(pci=pf_pci_addr.replace(':', r'\:')) + _, stdout, _ = self.ssh_helper.execute(command) + try: + return int(stdout) + except ValueError: + LOG.debug('Reading sriov_numvfs info failed') + return 0 + + def set_sriov_numvfs(self, pf_pci_addr, numvfs=0): + command = "sh -c 'echo {num} | tee /sys/bus/pci/devices/{pci}/sriov_numvfs'". \ + format(num=numvfs, pci=pf_pci_addr.replace(':', r'\:')) + self.ssh_helper.execute(command) + + def pci_driver_unbind(self, pci_addr): + command = "sh -c 'echo {pci} | tee /sys/bus/pci/devices/{pcie}/driver/unbind'". \ + format(pci=pci_addr, pcie=pci_addr.replace(':', r'\:')) + self.ssh_helper.execute(command) + + def get_pci_dev_driver(self, pci_addr): + cmd = 'lspci -vmmks {0}'.format(pci_addr) + ret_code, stdout, _ = self.ssh_helper.execute(cmd) + if int(ret_code): + raise RuntimeError("'{0}' failed".format(cmd)) + for line in stdout.splitlines(): + if not line: + continue + name = None + value = None + try: + name, value = line.split("\t", 1) + except ValueError: + if name == "Driver:": + return None + if name == 'Driver:': + return value + return None + + def vpp_create_ipsec_tunnels(self, if1_ip_addr, if2_ip_addr, if_name, + n_tunnels, n_connections, crypto_alg, + crypto_key, integ_alg, integ_key, addrs_ip, + spi_1=10000, spi_2=20000): + mask_length = 32 + if n_connections <= n_tunnels: + count = 1 + else: + count = int(n_connections / n_tunnels) + addr_ip_i = int(ipaddress.ip_address(str(addrs_ip))) + dst_start_ip = addr_ip_i + + tmp_fd, tmp_path = tempfile.mkstemp() + + vpp_ifname = self.get_value_by_interface_key(if_name, 'vpp_name') + ckey = binascii.hexlify(crypto_key.encode()) + ikey = binascii.hexlify(integ_key.encode()) + + integ = '' + if crypto_alg.alg_name != 'aes-gcm-128': + integ = 'integ_alg {integ_alg} ' \ + 'local_integ_key {local_integ_key} ' \ + 'remote_integ_key {remote_integ_key} ' \ + .format(integ_alg=integ_alg.alg_name, + local_integ_key=ikey, + remote_integ_key=ikey) + create_tunnels_cmds = 'ipsec_tunnel_if_add_del ' \ + 'local_spi {local_spi} ' \ + 'remote_spi {remote_spi} ' \ + 'crypto_alg {crypto_alg} ' \ + 'local_crypto_key {local_crypto_key} ' \ + 'remote_crypto_key {remote_crypto_key} ' \ + '{integ} ' \ + 'local_ip {local_ip} ' \ + 'remote_ip {remote_ip}\n' + start_tunnels_cmds = 'ip_add_del_route {raddr}/{mask} via {addr} ipsec{i}\n' \ + 'exec set interface unnumbered ipsec{i} use {uifc}\n' \ + 'sw_interface_set_flags ipsec{i} admin-up\n' + + with os.fdopen(tmp_fd, 'w') as tmp_file: + for i in range(0, n_tunnels): + create_tunnel = create_tunnels_cmds.format(local_spi=spi_1 + i, + remote_spi=spi_2 + i, + crypto_alg=crypto_alg.alg_name, + local_crypto_key=ckey, + remote_crypto_key=ckey, + integ=integ, + local_ip=if1_ip_addr, + remote_ip=if2_ip_addr) + tmp_file.write(create_tunnel) + self.execute_script(tmp_path, json_out=False, copy_on_execute=True) + os.remove(tmp_path) + + tmp_fd, tmp_path = tempfile.mkstemp() + + with os.fdopen(tmp_fd, 'w') as tmp_file: + for i in range(0, n_tunnels): + if count > 1: + dst_start_ip = addr_ip_i + i * count + dst_end_ip = ipaddress.ip_address(dst_start_ip + count - 1) + ips = [ipaddress.ip_address(ip) for ip in + [str(ipaddress.ip_address(dst_start_ip)), + str(dst_end_ip)]] + lowest_ip, highest_ip = min(ips), max(ips) + mask_length = self.get_prefix_length(int(lowest_ip), + int(highest_ip), + lowest_ip.max_prefixlen) + # TODO check duplicate route for some IPs + elif count == 1: + dst_start_ip = addr_ip_i + i + start_tunnel = start_tunnels_cmds.format( + raddr=str(ipaddress.ip_address(dst_start_ip)), + mask=mask_length, + addr=if2_ip_addr, + i=i, count=count, + uifc=vpp_ifname) + tmp_file.write(start_tunnel) + # TODO add route for remain IPs + + self.execute_script(tmp_path, json_out=False, copy_on_execute=True) + os.remove(tmp_path) + + def apply_config(self, vpp_cfg, restart_vpp=True): + vpp_config = vpp_cfg.dump_config() + ret, _, _ = \ + self.ssh_helper.execute('echo "{config}" | sudo tee {filename}'. + format(config=vpp_config, + filename=self.CFG_CONFIG)) + if ret != 0: + raise RuntimeError('Writing config file failed') + if restart_vpp: + self.start_vpp_service() + + def vpp_route_add(self, network, prefix_len, gateway=None, interface=None, + use_sw_index=True, resolve_attempts=10, + count=1, vrf=None, lookup_vrf=None, multipath=False, + weight=None, local=False): + if interface: + if use_sw_index: + int_cmd = ('sw_if_index {}'.format( + self.get_value_by_interface_key(interface, + 'vpp_sw_index'))) + else: + int_cmd = interface + else: + int_cmd = '' + + rap = 'resolve-attempts {}'.format(resolve_attempts) \ + if resolve_attempts else '' + + via = 'via {}'.format(gateway) if gateway else '' + + cnt = 'count {}'.format(count) \ + if count else '' + + vrf = 'vrf {}'.format(vrf) if vrf else '' + + lookup_vrf = 'lookup-in-vrf {}'.format( + lookup_vrf) if lookup_vrf else '' + + multipath = 'multipath' if multipath else '' + + weight = 'weight {}'.format(weight) if weight else '' + + local = 'local' if local else '' + + with VatTerminal(self.ssh_helper, json_param=False) as vat: + vat.vat_terminal_exec_cmd_from_template('add_route.vat', + network=network, + prefix_length=prefix_len, + via=via, + vrf=vrf, + interface=int_cmd, + resolve_attempts=rap, + count=cnt, + lookup_vrf=lookup_vrf, + multipath=multipath, + weight=weight, + local=local) + + def add_arp_on_dut(self, iface_key, ip_address, mac_address): + with VatTerminal(self.ssh_helper) as vat: + return vat.vat_terminal_exec_cmd_from_template( + 'add_ip_neighbor.vat', + sw_if_index=self.get_value_by_interface_key(iface_key, + 'vpp_sw_index'), + ip_address=ip_address, mac_address=mac_address) + + def set_ip(self, interface, address, prefix_length): + with VatTerminal(self.ssh_helper) as vat: + return vat.vat_terminal_exec_cmd_from_template( + 'add_ip_address.vat', + sw_if_index=self.get_value_by_interface_key(interface, + 'vpp_sw_index'), + address=address, prefix_length=prefix_length) + + def set_interface_state(self, interface, state): + sw_if_index = self.get_value_by_interface_key(interface, + 'vpp_sw_index') + + if state == 'up': + state = 'admin-up link-up' + elif state == 'down': + state = 'admin-down link-down' + else: + raise ValueError('Unexpected interface state: {}'.format(state)) + with VatTerminal(self.ssh_helper) as vat: + return vat.vat_terminal_exec_cmd_from_template( + 'set_if_state.vat', sw_if_index=sw_if_index, state=state) + + def vpp_set_interface_mtu(self, interface, mtu=9200): + sw_if_index = self.get_value_by_interface_key(interface, + 'vpp_sw_index') + if sw_if_index: + with VatTerminal(self.ssh_helper, json_param=False) as vat: + vat.vat_terminal_exec_cmd_from_template( + "hw_interface_set_mtu.vat", sw_if_index=sw_if_index, + mtu=mtu) + + def vpp_interfaces_ready_wait(self, timeout=30): + if_ready = False + not_ready = [] + start = time.time() + while not if_ready: + out = self.vpp_get_interface_data() + if time.time() - start > timeout: + for interface in out: + if interface.get('admin_up_down') == 1: + if interface.get('link_up_down') != 1: + LOG.debug('%s link-down', + interface.get('interface_name')) + raise RuntimeError('timeout, not up {0}'.format(not_ready)) + not_ready = [] + for interface in out: + if interface.get('admin_up_down') == 1: + if interface.get('link_up_down') != 1: + not_ready.append(interface.get('interface_name')) + if not not_ready: + if_ready = True + else: + LOG.debug('Interfaces still in link-down state: %s, ' + 'waiting...', not_ready) + time.sleep(1) + + def vpp_get_interface_data(self, interface=None): + with VatTerminal(self.ssh_helper) as vat: + response = vat.vat_terminal_exec_cmd_from_template( + "interface_dump.vat") + data = response[0] + if interface is not None: + if isinstance(interface, str): + param = "interface_name" + elif isinstance(interface, int): + param = "sw_if_index" + else: + raise TypeError + for data_if in data: + if data_if[param] == interface: + return data_if + return dict() + return data + + def update_vpp_interface_data(self): + data = {} + interface_dump_json = self.execute_script_json_out( + "dump_interfaces.vat") + interface_list = json.loads(interface_dump_json) + for interface in self.vnfd_helper.interfaces: + if_mac = interface['virtual-interface']['local_mac'] + interface_dict = VppSetupEnvHelper.get_vpp_interface_by_mac( + interface_list, if_mac) + if not interface_dict: + LOG.debug('Interface %s not found by MAC %s', interface, + if_mac) + continue + data[interface['virtual-interface']['ifname']] = { + 'vpp_name': interface_dict["interface_name"], + 'vpp_sw_index': interface_dict["sw_if_index"] + } + for iface_key, updated_vnfd in data.items(): + self._update_vnfd_helper(updated_vnfd, iface_key) + + def iface_update_numa(self): + iface_numa = {} + for interface in self.vnfd_helper.interfaces: + cmd = "cat /sys/bus/pci/devices/{}/numa_node".format( + interface["virtual-interface"]["vpci"]) + ret, out, _ = self.ssh_helper.execute(cmd) + if ret == 0: + try: + numa_node = int(out) + if numa_node < 0: + if self.vnfd_helper["cpuinfo"][-1][3] + 1 == 1: + iface_numa[ + interface['virtual-interface']['ifname']] = { + 'numa_node': 0 + } + else: + raise ValueError + else: + iface_numa[ + interface['virtual-interface']['ifname']] = { + 'numa_node': numa_node + } + except ValueError: + LOG.debug( + 'Reading numa location failed for: %s', + interface["virtual-interface"]["vpci"]) + for iface_key, updated_vnfd in iface_numa.items(): + self._update_vnfd_helper(updated_vnfd, iface_key) + + def execute_script(self, vat_name, json_out=True, copy_on_execute=False): + if copy_on_execute: + self.ssh_helper.put_file(vat_name, vat_name) + remote_file_path = vat_name + else: + vat_path = self.ssh_helper.join_bin_path("vpp", "templates") + remote_file_path = '{0}/{1}'.format(vat_path, vat_name) + + cmd = "{vat_bin} {json} in {vat_path} script".format( + vat_bin=self.VAT_BIN_NAME, + json="json" if json_out is True else "", + vat_path=remote_file_path) + + try: + return self.ssh_helper.execute(cmd=cmd) + except Exception: + raise RuntimeError("VAT script execution failed: {0}".format(cmd)) + + def execute_script_json_out(self, vat_name): + vat_path = self.ssh_helper.join_bin_path("vpp", "templates") + remote_file_path = '{0}/{1}'.format(vat_path, vat_name) + + _, stdout, _ = self.execute_script(vat_name, json_out=True) + return self.cleanup_vat_json_output(stdout, vat_file=remote_file_path) + + @staticmethod + def cleanup_vat_json_output(json_output, vat_file=None): + retval = json_output + clutter = ['vat#', 'dump_interface_table error: Misc', + 'dump_interface_table:6019: JSON output supported only ' \ + 'for VPE API calls and dump_stats_table'] + if vat_file: + clutter.append("{0}(2):".format(vat_file)) + for garbage in clutter: + retval = retval.replace(garbage, '') + return retval.strip() + + @staticmethod + def _convert_mac_to_number_list(mac_address): + list_mac = [] + for num in mac_address.split(":"): + list_mac.append(int(num, 16)) + return list_mac + + @staticmethod + def get_vpp_interface_by_mac(interfaces_list, mac_address): + interface_dict = {} + list_mac_address = VppSetupEnvHelper._convert_mac_to_number_list( + mac_address) + LOG.debug("MAC address %s converted to list %s.", mac_address, + list_mac_address) + for interface in interfaces_list: + # TODO: create vat json integrity checking and move there + if "l2_address" not in interface: + raise KeyError( + "key l2_address not found in interface dict." + "Probably input list is not parsed from correct VAT " + "json output.") + if "l2_address_length" not in interface: + raise KeyError( + "key l2_address_length not found in interface " + "dict. Probably input list is not parsed from correct " + "VAT json output.") + mac_from_json = interface["l2_address"][:6] + if mac_from_json == list_mac_address: + if interface["l2_address_length"] != 6: + raise ValueError("l2_address_length value is not 6.") + interface_dict = interface + break + return interface_dict + + @staticmethod + def get_prefix_length(number1, number2, bits): + for i in range(bits): + if number1 >> i == number2 >> i: + return bits - i + return 0 + + +class VatTerminal(object): + + __VAT_PROMPT = ("vat# ",) + __LINUX_PROMPT = (":~# ", ":~$ ", "~]$ ", "~]# ") + + + def __init__(self, ssh_helper, json_param=True): + json_text = ' json' if json_param else '' + self.json = json_param + self.ssh_helper = ssh_helper + EXEC_RETRY = 3 + + try: + self._tty = self.ssh_helper.interactive_terminal_open() + except Exception: + raise RuntimeError("Cannot open interactive terminal") + + for _ in range(EXEC_RETRY): + try: + self.ssh_helper.interactive_terminal_exec_command( + self._tty, + 'sudo -S {0}{1}'.format(VppSetupEnvHelper.VAT_BIN_NAME, + json_text), + self.__VAT_PROMPT) + except exceptions.SSHTimeout: + continue + else: + break + + self._exec_failure = False + self.vat_stdout = None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.vat_terminal_close() + + def vat_terminal_exec_cmd(self, cmd): + try: + out = self.ssh_helper.interactive_terminal_exec_command(self._tty, + cmd, + self.__VAT_PROMPT) + self.vat_stdout = out + except exceptions.SSHTimeout: + self._exec_failure = True + raise RuntimeError( + "VPP is not running on node. VAT command {0} execution failed". + format(cmd)) + if self.json: + obj_start = out.find('{') + obj_end = out.rfind('}') + array_start = out.find('[') + array_end = out.rfind(']') + + if obj_start == -1 and array_start == -1: + raise RuntimeError( + "VAT command {0}: no JSON data.".format(cmd)) + + if obj_start < array_start or array_start == -1: + start = obj_start + end = obj_end + 1 + else: + start = array_start + end = array_end + 1 + out = out[start:end] + json_out = json.loads(out) + return json_out + else: + return None + + def vat_terminal_close(self): + if not self._exec_failure: + try: + self.ssh_helper.interactive_terminal_exec_command(self._tty, + 'quit', + self.__LINUX_PROMPT) + except exceptions.SSHTimeout: + raise RuntimeError("Failed to close VAT console") + try: + self.ssh_helper.interactive_terminal_close(self._tty) + except Exception: + raise RuntimeError("Cannot close interactive terminal") + + def vat_terminal_exec_cmd_from_template(self, vat_template_file, **args): + file_path = os.path.join(constants.YARDSTICK_ROOT_PATH, + 'yardstick/resources/templates/', + vat_template_file) + with open(file_path, 'r') as template_file: + cmd_template = template_file.readlines() + ret = [] + for line_tmpl in cmd_template: + vat_cmd = line_tmpl.format(**args) + ret.append(self.vat_terminal_exec_cmd(vat_cmd.replace('\n', ''))) + return ret |