diff options
Diffstat (limited to 'yardstick/network_services')
31 files changed, 1733 insertions, 1322 deletions
diff --git a/yardstick/network_services/collector/subscriber.py b/yardstick/network_services/collector/subscriber.py index 322b3f5a2..0c6d97771 100644 --- a/yardstick/network_services/collector/subscriber.py +++ b/yardstick/network_services/collector/subscriber.py @@ -11,20 +11,40 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""This module implements stub for publishing results in yardstick format.""" + import logging +from yardstick.network_services.nfvi.resource import ResourceProfile +from yardstick.network_services.utils import get_nsb_option + + LOG = logging.getLogger(__name__) class Collector(object): """Class that handles dictionary of results in yardstick-plot format.""" - def __init__(self, vnfs): + def __init__(self, vnfs, contexts_nodes, timeout=3600): super(Collector, self).__init__() self.vnfs = vnfs + self.nodes = contexts_nodes + self.bin_path = get_nsb_option('bin_path', '') + self.resource_profiles = {} + + for ctx_name, nodes in ((ctx_name, nodes) for (ctx_name, nodes) + in contexts_nodes.items() if nodes): + for node in (node for node in nodes + if node and node.get('collectd')): + name = ".".join([node['name'], ctx_name]) + self.resource_profiles.update( + {name: ResourceProfile.make_from_node(node, timeout)}) def start(self): + for resource in self.resource_profiles.values(): + resource.initiate_systemagent(self.bin_path) + resource.start() + resource.amqp_process_for_nfvi_kpi() + for vnf in self.vnfs: vnf.start_collect() @@ -32,6 +52,9 @@ class Collector(object): for vnf in self.vnfs: vnf.stop_collect() + for resource in self.resource_profiles.values(): + resource.stop() + def get_kpi(self): """Returns dictionary of results in yardstick-plot format @@ -42,7 +65,12 @@ class Collector(object): for vnf in self.vnfs: # Result example: # {"VNF1: { "tput" : [1000, 999] }, "VNF2": { "latency": 100 }} - LOG.debug("collect KPI for %s", vnf.name) + LOG.debug("collect KPI for vnf %s", vnf.name) results[vnf.name] = vnf.collect_kpi() + for node_name, resource in self.resource_profiles.items(): + LOG.debug("collect KPI for nfvi_node %s", node_name) + results[node_name] = {"core": resource.amqp_collect_nfvi_kpi()} + LOG.debug("%s collect KPIs %s", node_name, results[node_name]['core']) + return results diff --git a/yardstick/network_services/helpers/dpdkbindnic_helper.py b/yardstick/network_services/helpers/dpdkbindnic_helper.py index 05b822c2e..1c74355ef 100644 --- a/yardstick/network_services/helpers/dpdkbindnic_helper.py +++ b/yardstick/network_services/helpers/dpdkbindnic_helper.py @@ -18,12 +18,9 @@ import re from collections import defaultdict from itertools import chain +from yardstick.common import exceptions from yardstick.common.utils import validate_non_string_sequence -from yardstick.error import IncorrectConfig -from yardstick.error import IncorrectSetup -from yardstick.error import IncorrectNodeSetup -from yardstick.error import SSHTimeout -from yardstick.error import SSHError + NETWORK_KERNEL = 'network_kernel' NETWORK_DPDK = 'network_dpdk' @@ -51,7 +48,7 @@ class DpdkInterface(object): try: assert self.local_mac except (AssertionError, KeyError): - raise IncorrectConfig + raise exceptions.IncorrectConfig(error_msg='') @property def local_mac(self): @@ -98,10 +95,12 @@ class DpdkInterface(object): # if we don't find all the keys then don't update pass - except (IncorrectNodeSetup, SSHError, SSHTimeout): - raise IncorrectConfig( - "Unable to probe missing interface fields '%s', on node %s " - "SSH Error" % (', '.join(self.missing_fields), self.dpdk_node.node_key)) + except (exceptions.IncorrectNodeSetup, exceptions.SSHError, + exceptions.SSHTimeout): + message = ('Unable to probe missing interface fields "%s", on ' + 'node %s SSH Error' % (', '.join(self.missing_fields), + self.dpdk_node.node_key)) + raise exceptions.IncorrectConfig(error_msg=message) class DpdkNode(object): @@ -118,11 +117,12 @@ class DpdkNode(object): try: self.dpdk_interfaces = {intf['name']: DpdkInterface(self, intf['virtual-interface']) for intf in self.interfaces} - except IncorrectConfig: + except exceptions.IncorrectConfig: template = "MAC address is required for all interfaces, missing on: {}" errors = (intf['name'] for intf in self.interfaces if 'local_mac' not in intf['virtual-interface']) - raise IncorrectSetup(template.format(", ".join(errors))) + raise exceptions.IncorrectSetup( + error_msg=template.format(", ".join(errors))) @property def dpdk_helper(self): @@ -176,7 +176,7 @@ class DpdkNode(object): self._probe_netdevs() try: self._probe_missing_values() - except IncorrectConfig: + except exceptions.IncorrectConfig: # ignore for now pass @@ -193,7 +193,7 @@ class DpdkNode(object): missing_fields) errors = "\n".join(errors) if errors: - raise IncorrectSetup(errors) + raise exceptions.IncorrectSetup(error_msg=errors) finally: self._dpdk_helper = None diff --git a/yardstick/network_services/helpers/samplevnf_helper.py b/yardstick/network_services/helpers/samplevnf_helper.py index 0ab10d7b7..8e6a3a3ea 100644 --- a/yardstick/network_services/helpers/samplevnf_helper.py +++ b/yardstick/network_services/helpers/samplevnf_helper.py @@ -23,8 +23,7 @@ from itertools import chain, repeat import six from six.moves.configparser import ConfigParser - -from yardstick.common.utils import ip_to_hex +from yardstick.common import utils LOG = logging.getLogger(__name__) @@ -34,19 +33,6 @@ link {0} config {1} {2} link {0} up """ -ACTION_TEMPLATE = """\ -p action add {0} accept -p action add {0} fwd {0} -p action add {0} count -""" - -FW_ACTION_TEMPLATE = """\ -p action add {0} accept -p action add {0} fwd {0} -p action add {0} count -p action add {0} conntrack -""" - # This sets up a basic passthrough with no rules SCRIPT_TPL = """ {link_config} @@ -59,9 +45,7 @@ SCRIPT_TPL = """ {arp_route_tbl6} -{actions} - -{rules} +{flows} """ @@ -182,26 +166,9 @@ class MultiPortConfig(object): return parser.get(section, key) return default - @staticmethod - def make_ip_addr(ip, mask): - """ - :param ip: ip adddress - :type ip: str - :param mask: /24 prefix of 255.255.255.0 netmask - :type mask: str - :return: interface - :rtype: IPv4Interface - """ - - try: - return ipaddress.ip_interface(six.text_type('/'.join([ip, mask]))) - except (TypeError, ValueError): - # None so we can skip later - return None - @classmethod def validate_ip_and_prefixlen(cls, ip_addr, prefixlen): - ip_addr = cls.make_ip_addr(ip_addr, prefixlen) + ip_addr = utils.make_ip_addr(ip_addr, prefixlen) return ip_addr.ip.exploded, ip_addr.network.prefixlen def __init__(self, topology_file, config_tpl, tmp_file, vnfd_helper, @@ -245,7 +212,7 @@ class MultiPortConfig(object): self.ports_len = 0 self.prv_que_handler = None self.vnfd = None - self.rules = None + self.flows = None self.pktq_out = [] @staticmethod @@ -360,7 +327,7 @@ class MultiPortConfig(object): "%s/%s" % (interface["dst_ip"], interface["netmask"]))) arp_vars = { - "port_netmask_hex": ip_to_hex(dst_port_ip.network.netmask.exploded), + "port_netmask_hex": utils.ip_to_hex(dst_port_ip.network.netmask.exploded), # this is the port num that contains port0 subnet and next_hop_ip_hex # this is LINKID which should be based on DPDK port number "port_num": dpdk_port_num, @@ -542,7 +509,7 @@ class MultiPortConfig(object): self.update_write_parser(self.loadb_tpl) self.start_core += 1 - for i in range(self.worker_threads): + for _ in range(self.worker_threads): vnf_data = self.generate_vnf_data() if not self.vnf_tpl: self.vnf_tpl = {} @@ -637,65 +604,8 @@ class MultiPortConfig(object): return '\n'.join(('p {3} arpadd {0} {1} {2}'.format(*values) for values in arp_config6)) - def generate_action_config(self): - port_list = (self.vnfd_helper.port_num(p) for p in self.all_ports) - if self.vnf_type == "VFW": - template = FW_ACTION_TEMPLATE - else: - template = ACTION_TEMPLATE - - return ''.join((template.format(port) for port in port_list)) - - def get_ip_from_port(self, port): - # we can't use gateway because in OpenStack gateways interfer with floating ip routing - # return self.make_ip_addr(self.get_ports_gateway(port), self.get_netmask_gateway(port)) - vintf = self.vnfd_helper.find_interface(name=port)["virtual-interface"] - ip = vintf["local_ip"] - netmask = vintf["netmask"] - return self.make_ip_addr(ip, netmask) - - def get_network_and_prefixlen_from_ip_of_port(self, port): - ip_addr = self.get_ip_from_port(port) - # handle cases with no gateway - if ip_addr: - return ip_addr.network.network_address.exploded, ip_addr.network.prefixlen - else: - return None, None - - def generate_rule_config(self): - cmd = 'acl' if self.vnf_type == "ACL" else "vfw" - rules_config = self.rules if self.rules else '' - new_rules = [] - new_ipv6_rules = [] - pattern = 'p {0} add {1} {2} {3} {4} {5} 0 65535 0 65535 0 0 {6}' - for src_intf, dst_intf in self.port_pair_list: - src_port = self.vnfd_helper.port_num(src_intf) - dst_port = self.vnfd_helper.port_num(dst_intf) - - src_net, src_prefix_len = self.get_network_and_prefixlen_from_ip_of_port(src_intf) - dst_net, dst_prefix_len = self.get_network_and_prefixlen_from_ip_of_port(dst_intf) - # ignore entires with empty values - if all((src_net, src_prefix_len, dst_net, dst_prefix_len)): - new_rules.append((cmd, self.txrx_pipeline, src_net, src_prefix_len, - dst_net, dst_prefix_len, dst_port)) - new_rules.append((cmd, self.txrx_pipeline, dst_net, dst_prefix_len, - src_net, src_prefix_len, src_port)) - - # src_net = self.get_ports_gateway6(port_pair[0]) - # src_prefix_len = self.get_netmask_gateway6(port_pair[0]) - # dst_net = self.get_ports_gateway6(port_pair[1]) - # dst_prefix_len = self.get_netmask_gateway6(port_pair[0]) - # # ignore entires with empty values - # if all((src_net, src_prefix_len, dst_net, dst_prefix_len)): - # new_ipv6_rules.append((cmd, self.txrx_pipeline, src_net, src_prefix_len, - # dst_net, dst_prefix_len, dst_port)) - # new_ipv6_rules.append((cmd, self.txrx_pipeline, dst_net, dst_prefix_len, - # src_net, src_prefix_len, src_port)) - - acl_apply = "\np %s applyruleset" % cmd - new_rules_config = '\n'.join(pattern.format(*values) for values - in chain(new_rules, new_ipv6_rules)) - return ''.join([rules_config, new_rules_config, acl_apply]) + def get_flows_config(self): + return self.flows if self.flows else '' def generate_script_data(self): self._port_pairs = PortPairs(self.vnfd_helper.interfaces) @@ -707,24 +617,15 @@ class MultiPortConfig(object): # disable IPv6 for now # 'arp_config6': self.generate_arp_config6(), 'arp_config6': "", - 'arp_config': self.generate_arp_config(), 'arp_route_tbl': self.generate_arp_route_tbl(), 'arp_route_tbl6': "", - 'actions': '', - 'rules': '', + 'flows': self.get_flows_config() } - - if self.vnf_type in ('ACL', 'VFW'): - script_data.update({ - 'actions': self.generate_action_config(), - 'rules': self.generate_rule_config(), - }) - return script_data - def generate_script(self, vnfd, rules=None): + def generate_script(self, vnfd, flows=None): self.vnfd = vnfd - self.rules = rules + self.flows = flows script_data = self.generate_script_data() script = SCRIPT_TPL.format(**script_data) if self.lb_config == self.HW_LB: diff --git a/yardstick/network_services/libs/ixia_libs/IxNet/IxNet.py b/yardstick/network_services/libs/ixia_libs/IxNet/IxNet.py deleted file mode 100644 index c538ceeba..000000000 --- a/yardstick/network_services/libs/ixia_libs/IxNet/IxNet.py +++ /dev/null @@ -1,334 +0,0 @@ -# Copyright (c) 2016-2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging - -import re -from itertools import product -import IxNetwork - - -log = logging.getLogger(__name__) - -IP_VERSION_4 = 4 -IP_VERSION_6 = 6 - - -class TrafficStreamHelper(object): - - TEMPLATE = '{0.traffic_item}/{0.stream}:{0.param_id}/{1}' - - def __init__(self, traffic_item, stream, param_id): - super(TrafficStreamHelper, self).__init__() - self.traffic_item = traffic_item - self.stream = stream - self.param_id = param_id - - def __getattr__(self, item): - return self.TEMPLATE.format(self, item) - - -class FramesizeHelper(object): - - def __init__(self): - super(FramesizeHelper, self).__init__() - self.weighted_pairs = [] - self.weighted_range_pairs = [] - - @property - def weighted_pairs_arg(self): - return '-weightedPairs', self.weighted_pairs - - @property - def weighted_range_pairs_arg(self): - return '-weightedRangePairs', self.weighted_range_pairs - - def make_args(self, *args): - return self.weighted_pairs_arg + self.weighted_range_pairs_arg + args - - def populate_data(self, framesize_data): - for key, value in framesize_data.items(): - if value == '0': - continue - - replaced = re.sub('[Bb]', '', key) - self.weighted_pairs.extend([ - replaced, - value, - ]) - pairs = [ - replaced, - replaced, - value, - ] - self.weighted_range_pairs.append(pairs) - - -class IxNextgen(object): - - STATS_NAME_MAP = { - "traffic_item": 'Traffic Item', - "Tx_Frames": 'Tx Frames', - "Rx_Frames": 'Rx Frames', - "Tx_Frame_Rate": 'Tx Frame Rate', - "Rx_Frame_Rate": 'Tx Frame Rate', - "Store-Forward_Avg_latency_ns": 'Store-Forward Avg Latency (ns)', - "Store-Forward_Min_latency_ns": 'Store-Forward Min Latency (ns)', - "Store-Forward_Max_latency_ns": 'Store-Forward Max Latency (ns)', - } - - PORT_STATS_NAME_MAP = { - "stat_name": 'Stat Name', - "Frames_Tx": 'Frames Tx.', - "Valid_Frames_Rx": 'Valid Frames Rx.', - "Frames_Tx_Rate": 'Frames Tx. Rate', - "Valid_Frames_Rx_Rate": 'Valid Frames Rx. Rate', - "Tx_Rate_Kbps": 'Tx. Rate (Kbps)', - "Rx_Rate_Kbps": 'Rx. Rate (Kbps)', - "Tx_Rate_Mbps": 'Tx. Rate (Mbps)', - "Rx_Rate_Mbps": 'Rx. Rate (Mbps)', - } - - LATENCY_NAME_MAP = { - "Store-Forward_Avg_latency_ns": 'Store-Forward Avg Latency (ns)', - "Store-Forward_Min_latency_ns": 'Store-Forward Min Latency (ns)', - "Store-Forward_Max_latency_ns": 'Store-Forward Max Latency (ns)', - } - - RANDOM_MASK_MAP = { - IP_VERSION_4: '0.0.0.255', - IP_VERSION_6: '0:0:0:0:0:0:0:ff', - } - - MODE_SEEDS_MAP = { - 0: ('uplink', ['256', '2048']), - } - - MODE_SEEDS_DEFAULT = 'downlink', ['2048', '256'] - - @staticmethod - def find_view_obj(view_name, views): - edited_view_name = '::ixNet::OBJ-/statistics/view:"{}"'.format(view_name) - return next((view for view in views if edited_view_name == view), '') - - @staticmethod - def get_config(tg_cfg): - card = [] - port = [] - external_interface = tg_cfg["vdu"][0]["external-interface"] - for intf in external_interface: - card_port0 = intf["virtual-interface"]["vpci"] - card0, port0 = card_port0.split(':')[:2] - card.append(card0) - port.append(port0) - - cfg = { - 'machine': tg_cfg["mgmt-interface"]["ip"], - 'port': tg_cfg["mgmt-interface"]["tg-config"]["tcl_port"], - 'chassis': tg_cfg["mgmt-interface"]["tg-config"]["ixchassis"], - 'cards': card, - 'ports': port, - 'output_dir': tg_cfg["mgmt-interface"]["tg-config"]["dut_result_dir"], - 'version': tg_cfg["mgmt-interface"]["tg-config"]["version"], - 'bidir': True, - } - - return cfg - - def __init__(self, ixnet=None): - self.ixnet = ixnet - self._objRefs = dict() - self._cfg = None - self._logger = logging.getLogger(__name__) - self._params = None - self._bidir = None - - def iter_over_get_lists(self, x1, x2, y2, offset=0): - for x in self.ixnet.getList(x1, x2): - y_list = self.ixnet.getList(x, y2) - for i, y in enumerate(y_list, offset): - yield x, y, i - - def set_random_ip_multi_attribute(self, ipv4, seed, fixed_bits, random_mask, l3_count): - self.ixnet.setMultiAttribute( - ipv4, - '-seed', str(seed), - '-fixedBits', str(fixed_bits), - '-randomMask', str(random_mask), - '-valueType', 'random', - '-countValue', str(l3_count)) - - def set_random_ip_multi_attributes(self, ip, version, seeds, l3): - try: - random_mask = self.RANDOM_MASK_MAP[version] - except KeyError: - raise ValueError('Unknown version %s' % version) - - l3_count = l3['count'] - if "srcIp" in ip: - fixed_bits = l3['srcip4'] - self.set_random_ip_multi_attribute(ip, seeds[0], fixed_bits, random_mask, l3_count) - if "dstIp" in ip: - fixed_bits = l3['dstip4'] - self.set_random_ip_multi_attribute(ip, seeds[1], fixed_bits, random_mask, l3_count) - - def add_ip_header(self, params, version): - for _, ep, i in self.iter_over_get_lists('/traffic', 'trafficItem', "configElement", 1): - iter1 = (v['outer_l3'] for v in params.values() if str(v['id']) == str(i)) - try: - l3 = next(iter1, {}) - seeds = self.MODE_SEEDS_MAP.get(i, self.MODE_SEEDS_DEFAULT)[1] - except (KeyError, IndexError): - continue - - for _, ip_bits, _ in self.iter_over_get_lists(ep, 'stack', 'field'): - self.set_random_ip_multi_attributes(ip_bits, version, seeds, l3) - - self.ixnet.commit() - - def _connect(self, tg_cfg): - self._cfg = self.get_config(tg_cfg) - self.ixnet = IxNetwork.IxNet() - - machine = self._cfg['machine'] - port = str(self._cfg['port']) - version = str(self._cfg['version']) - result = self.ixnet.connect(machine, '-port', port, '-version', version) - return result - - def clear_ixia_config(self): - self.ixnet.execute('newConfig') - - def load_ixia_profile(self, profile): - self.ixnet.execute('loadConfig', self.ixnet.readFrom(profile)) - - def ix_load_config(self, profile): - self.clear_ixia_config() - self.load_ixia_profile(profile) - - def ix_assign_ports(self): - vports = self.ixnet.getList(self.ixnet.getRoot(), 'vport') - ports = [] - - chassis = self._cfg['chassis'] - ports = [(chassis, card, port) for card, port in - zip(self._cfg['cards'], self._cfg['ports'])] - - vport_list = self.ixnet.getList("/", "vport") - self.ixnet.execute('assignPorts', ports, [], vport_list, True) - self.ixnet.commit() - - for vport in vports: - if self.ixnet.getAttribute(vport, '-state') != 'up': - log.error("Both thr ports are down...") - - def ix_update_frame(self, params): - streams = ["configElement"] - - for param in params.values(): - framesize_data = FramesizeHelper() - traffic_items = self.ixnet.getList('/traffic', 'trafficItem') - param_id = param['id'] - for traffic_item, stream in product(traffic_items, streams): - helper = TrafficStreamHelper(traffic_item, stream, param_id) - - self.ixnet.setMultiAttribute(helper.transmissionControl, - '-type', '{0}'.format(param.get('traffic_type', - 'continuous')), - '-duration', '{0}'.format(param.get('duration', - "30"))) - - stream_frame_rate_path = helper.frameRate - self.ixnet.setMultiAttribute(stream_frame_rate_path, '-rate', param['iload']) - if param['outer_l2']['framesPerSecond']: - self.ixnet.setMultiAttribute(stream_frame_rate_path, - '-type', 'framesPerSecond') - - framesize_data.populate_data(param['outer_l2']['framesize']) - - make_attr_args = framesize_data.make_args('-incrementFrom', '66', - '-randomMin', '66', - '-quadGaussian', [], - '-type', 'weightedPairs', - '-presetDistribution', 'cisco', - '-incrementTo', '1518') - - self.ixnet.setMultiAttribute(helper.frameSize, *make_attr_args) - - self.ixnet.commit() - - def update_ether_multi_attribute(self, ether, mac_addr): - self.ixnet.setMultiAttribute(ether, - '-singleValue', mac_addr, - '-fieldValue', mac_addr, - '-valueType', 'singleValue') - - def update_ether_multi_attributes(self, ether, l2): - if "ethernet.header.destinationAddress" in ether: - self.update_ether_multi_attribute(ether, str(l2.get('dstmac', "00:00:00:00:00:02"))) - - if "ethernet.header.sourceAddress" in ether: - self.update_ether_multi_attribute(ether, str(l2.get('srcmac', "00:00:00:00:00:01"))) - - def ix_update_ether(self, params): - for _, ep, index in self.iter_over_get_lists('/traffic', 'trafficItem', - "configElement", 1): - iter1 = (v['outer_l2'] for v in params.values() if str(v['id']) == str(index)) - try: - l2 = next(iter1, {}) - except KeyError: - continue - - for _, ether, _ in self.iter_over_get_lists(ep, 'stack', 'field'): - self.update_ether_multi_attributes(ether, l2) - - self.ixnet.commit() - - def ix_update_udp(self, params): - pass - - def ix_update_tcp(self, params): - pass - - def ix_start_traffic(self): - tis = self.ixnet.getList('/traffic', 'trafficItem') - for ti in tis: - self.ixnet.execute('generate', [ti]) - self.ixnet.execute('apply', '/traffic') - self.ixnet.execute('start', '/traffic') - - def ix_stop_traffic(self): - tis = self.ixnet.getList('/traffic', 'trafficItem') - for _ in tis: - self.ixnet.execute('stop', '/traffic') - - def build_stats_map(self, view_obj, name_map): - return {kl: self.execute_get_column_values(view_obj, kr) for kl, kr in name_map.items()} - - def execute_get_column_values(self, view_obj, name): - return self.ixnet.execute('getColumnValues', view_obj, name) - - def ix_get_statistics(self): - views = self.ixnet.getList('/statistics', 'view') - stats = {} - view_obj = self.find_view_obj("Traffic Item Statistics", views) - stats = self.build_stats_map(view_obj, self.STATS_NAME_MAP) - - view_obj = self.find_view_obj("Port Statistics", views) - ports_stats = self.build_stats_map(view_obj, self.PORT_STATS_NAME_MAP) - - view_obj = self.find_view_obj("Flow Statistics", views) - stats["latency"] = self.build_stats_map(view_obj, self.LATENCY_NAME_MAP) - - return stats, ports_stats diff --git a/yardstick/network_services/libs/ixia_libs/IxNet/__init__.py b/yardstick/network_services/libs/ixia_libs/ixnet/__init__.py index e69de29bb..e69de29bb 100644 --- a/yardstick/network_services/libs/ixia_libs/IxNet/__init__.py +++ b/yardstick/network_services/libs/ixia_libs/ixnet/__init__.py diff --git a/yardstick/network_services/libs/ixia_libs/ixnet/ixnet_api.py b/yardstick/network_services/libs/ixia_libs/ixnet/ixnet_api.py new file mode 100644 index 000000000..74deeecb5 --- /dev/null +++ b/yardstick/network_services/libs/ixia_libs/ixnet/ixnet_api.py @@ -0,0 +1,471 @@ +# Copyright (c) 2016-2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import IxNetwork + +from yardstick.common import exceptions +from yardstick.common import utils + + +log = logging.getLogger(__name__) + +IP_VERSION_4 = 4 +IP_VERSION_6 = 6 + +PROTO_ETHERNET = 'ethernet' +PROTO_IPV4 = 'ipv4' +PROTO_IPV6 = 'ipv6' +PROTO_UDP = 'udp' +PROTO_TCP = 'tcp' +PROTO_VLAN = 'vlan' + +IP_VERSION_4_MASK = '0.0.0.255' +IP_VERSION_6_MASK = '0:0:0:0:0:0:0:ff' + +TRAFFIC_STATUS_STARTED = 'started' +TRAFFIC_STATUS_STOPPED = 'stopped' + + +# NOTE(ralonsoh): this pragma will be removed in the last patch of this series +class IxNextgen(object): # pragma: no cover + + PORT_STATS_NAME_MAP = { + "stat_name": 'Stat Name', + "Frames_Tx": 'Frames Tx.', + "Valid_Frames_Rx": 'Valid Frames Rx.', + "Frames_Tx_Rate": 'Frames Tx. Rate', + "Valid_Frames_Rx_Rate": 'Valid Frames Rx. Rate', + "Tx_Rate_Kbps": 'Tx. Rate (Kbps)', + "Rx_Rate_Kbps": 'Rx. Rate (Kbps)', + "Tx_Rate_Mbps": 'Tx. Rate (Mbps)', + "Rx_Rate_Mbps": 'Rx. Rate (Mbps)', + } + + LATENCY_NAME_MAP = { + "Store-Forward_Avg_latency_ns": 'Store-Forward Avg Latency (ns)', + "Store-Forward_Min_latency_ns": 'Store-Forward Min Latency (ns)', + "Store-Forward_Max_latency_ns": 'Store-Forward Max Latency (ns)', + } + + @staticmethod + def get_config(tg_cfg): + card = [] + port = [] + external_interface = tg_cfg["vdu"][0]["external-interface"] + for intf in external_interface: + card_port0 = intf["virtual-interface"]["vpci"] + card0, port0 = card_port0.split(':')[:2] + card.append(card0) + port.append(port0) + + cfg = { + 'machine': tg_cfg["mgmt-interface"]["ip"], + 'port': tg_cfg["mgmt-interface"]["tg-config"]["tcl_port"], + 'chassis': tg_cfg["mgmt-interface"]["tg-config"]["ixchassis"], + 'cards': card, + 'ports': port, + 'output_dir': tg_cfg["mgmt-interface"]["tg-config"]["dut_result_dir"], + 'version': tg_cfg["mgmt-interface"]["tg-config"]["version"], + 'bidir': True, + } + + return cfg + + def __init__(self): # pragma: no cover + self._ixnet = None + self._cfg = None + self._params = None + self._bidir = None + + @property + def ixnet(self): # pragma: no cover + if self._ixnet: + return self._ixnet + raise exceptions.IxNetworkClientNotConnected() + + def _get_config_element_by_flow_group_name(self, flow_group_name): + """Get a config element using the flow group name + + Each named flow group contains one config element (by configuration). + According to the documentation, "configElements" is a list and "each + item in this list is aligned to the sequential order of your endpoint + list". + + :param flow_group_name: (str) flow group name; this parameter is + always a number (converted to string) starting + from "1". + :return: (str) config element reference ID or None. + """ + traffic_item = self.ixnet.getList(self.ixnet.getRoot() + '/traffic', + 'trafficItem')[0] + flow_groups = self.ixnet.getList(traffic_item, 'endpointSet') + for flow_group in flow_groups: + if (str(self.ixnet.getAttribute(flow_group, '-name')) == + flow_group_name): + return traffic_item + '/configElement:' + flow_group_name + + def _get_stack_item(self, flow_group_name, protocol_name): + """Return the stack item given the flow group name and the proto name + + :param flow_group_name: (str) flow group name + :param protocol_name: (str) protocol name, referred to PROTO_* + constants + :return: list of stack item descriptors + """ + celement = self._get_config_element_by_flow_group_name(flow_group_name) + if not celement: + raise exceptions.IxNetworkFlowNotPresent( + flow_group=flow_group_name) + stack_items = self.ixnet.getList(celement, 'stack') + return [s_i for s_i in stack_items if protocol_name in s_i] + + def _get_field_in_stack_item(self, stack_item, field_name): + """Return the field in a stack item given the name + + :param stack_item: (str) stack item descriptor + :param field_name: (str) field name + :return: (str) field descriptor + """ + fields = self.ixnet.getList(stack_item, 'field') + for field in (field for field in fields if field_name in field): + return field + raise exceptions.IxNetworkFieldNotPresentInStackItem( + field_name=field_name, stack_item=stack_item) + + def _get_traffic_state(self): + """Get traffic state""" + return self.ixnet.getAttribute(self.ixnet.getRoot() + 'traffic', + '-state') + + def is_traffic_running(self): + """Returns true if traffic state == TRAFFIC_STATUS_STARTED""" + return self._get_traffic_state() == TRAFFIC_STATUS_STARTED + + def is_traffic_stopped(self): + """Returns true if traffic state == TRAFFIC_STATUS_STOPPED""" + return self._get_traffic_state() == TRAFFIC_STATUS_STOPPED + + @staticmethod + def _parse_framesize(framesize): + """Parse "framesize" config param. to return a list of weighted pairs + + :param framesize: dictionary of frame sizes and weights + :return: list of paired frame sizes and weights + """ + weighted_range_pairs = [] + for size, weight in ((s, w) for (s, w) in framesize.items() + if int(w) != 0): + size = int(size.upper().replace('B', '')) + weighted_range_pairs.append([size, size, int(weight)]) + return weighted_range_pairs + + def iter_over_get_lists(self, x1, x2, y2, offset=0): + for x in self.ixnet.getList(x1, x2): + y_list = self.ixnet.getList(x, y2) + for i, y in enumerate(y_list, offset): + yield x, y, i + + def connect(self, tg_cfg): + self._cfg = self.get_config(tg_cfg) + self._ixnet = IxNetwork.IxNet() + + machine = self._cfg['machine'] + port = str(self._cfg['port']) + version = str(self._cfg['version']) + return self.ixnet.connect(machine, '-port', port, + '-version', version) + + def clear_config(self): + """Wipe out any possible configuration present in the client""" + self.ixnet.execute('newConfig') + + def assign_ports(self): + """Create and assign vports for each physical port defined in config + + This configuration is present in the IXIA profile file. E.g.: + name: trafficgen_1 + role: IxNet + interfaces: + xe0: + vpci: "2:15" # Card:port + driver: "none" + dpdk_port_num: 0 + local_ip: "152.16.100.20" + netmask: "255.255.0.0" + local_mac: "00:98:10:64:14:00" + xe1: + ... + """ + chassis_ip = self._cfg['chassis'] + ports = [(chassis_ip, card, port) for card, port in + zip(self._cfg['cards'], self._cfg['ports'])] + + log.info('Create and assign vports: %s', ports) + for port in ports: + vport = self.ixnet.add(self.ixnet.getRoot(), 'vport') + self.ixnet.commit() + self.ixnet.execute('assignPorts', [port], [], [vport], True) + self.ixnet.commit() + if self.ixnet.getAttribute(vport, '-state') != 'up': + log.warning('Port %s is down', vport) + + def _create_traffic_item(self): + """Create the traffic item to hold the flow groups + + The traffic item tracking by "Traffic Item" is enabled to retrieve the + latency statistics. + """ + log.info('Create the traffic item "RFC2544"') + traffic_item = self.ixnet.add(self.ixnet.getRoot() + '/traffic', + 'trafficItem') + self.ixnet.setMultiAttribute(traffic_item, '-name', 'RFC2544', + '-trafficType', 'raw') + self.ixnet.commit() + + traffic_item_id = self.ixnet.remapIds(traffic_item)[0] + self.ixnet.setAttribute(traffic_item_id + '/tracking', + '-trackBy', 'trafficGroupId0') + self.ixnet.commit() + + def _create_flow_groups(self): + """Create the flow groups between the assigned ports""" + traffic_item_id = self.ixnet.getList(self.ixnet.getRoot() + 'traffic', + 'trafficItem')[0] + log.info('Create the flow groups') + vports = self.ixnet.getList(self.ixnet.getRoot(), 'vport') + uplink_ports = vports[::2] + downlink_ports = vports[1::2] + index = 0 + for up, down in zip(uplink_ports, downlink_ports): + log.info('FGs: %s <--> %s', up, down) + endpoint_set_1 = self.ixnet.add(traffic_item_id, 'endpointSet') + endpoint_set_2 = self.ixnet.add(traffic_item_id, 'endpointSet') + self.ixnet.setMultiAttribute( + endpoint_set_1, '-name', str(index + 1), + '-sources', [up + '/protocols'], + '-destinations', [down + '/protocols']) + self.ixnet.setMultiAttribute( + endpoint_set_2, '-name', str(index + 2), + '-sources', [down + '/protocols'], + '-destinations', [up + '/protocols']) + self.ixnet.commit() + index += 2 + + def _append_procotol_to_stack(self, protocol_name, previous_element): + """Append a new element in the packet definition stack""" + protocol = (self.ixnet.getRoot() + + '/traffic/protocolTemplate:"{}"'.format(protocol_name)) + self.ixnet.execute('append', previous_element, protocol) + + def _setup_config_elements(self): + """Setup the config elements + + The traffic item is configured to allow individual configurations per + config element. The default frame configuration is applied: + Ethernet II: added by default + IPv4: element to add + UDP: element to add + Payload: added by default + Ethernet II (Trailer): added by default + :return: + """ + traffic_item_id = self.ixnet.getList(self.ixnet.getRoot() + 'traffic', + 'trafficItem')[0] + log.info('Split the frame rate distribution per config element') + config_elements = self.ixnet.getList(traffic_item_id, 'configElement') + for config_element in config_elements: + self.ixnet.setAttribute(config_element + '/frameRateDistribution', + '-portDistribution', 'splitRateEvenly') + self.ixnet.setAttribute(config_element + '/frameRateDistribution', + '-streamDistribution', 'splitRateEvenly') + self.ixnet.commit() + self._append_procotol_to_stack( + PROTO_UDP, config_element + '/stack:"ethernet-1"') + self._append_procotol_to_stack( + PROTO_IPV4, config_element + '/stack:"ethernet-1"') + + def create_traffic_model(self): + """Create a traffic item and the needed flow groups + + Each flow group inside the traffic item (only one is present) + represents the traffic between two ports: + (uplink) (downlink) + FlowGroup1: port1 -> port2 + FlowGroup2: port1 <- port2 + FlowGroup3: port3 -> port4 + FlowGroup4: port3 <- port4 + """ + self._create_traffic_item() + self._create_flow_groups() + self._setup_config_elements() + + def _update_frame_mac(self, ethernet_descriptor, field, mac_address): + """Set the MAC address in a config element stack Ethernet field + + :param ethernet_descriptor: (str) ethernet descriptor, e.g.: + /traffic/trafficItem:1/configElement:1/stack:"ethernet-1" + :param field: (str) field name, e.g.: destinationAddress + :param mac_address: (str) MAC address + """ + field_descriptor = self._get_field_in_stack_item(ethernet_descriptor, + field) + self.ixnet.setMultiAttribute(field_descriptor, + '-singleValue', mac_address, + '-fieldValue', mac_address, + '-valueType', 'singleValue') + self.ixnet.commit() + + def update_frame(self, traffic): + """Update the L2 frame + + This function updates the L2 frame options: + - Traffic type: "continuous", "fixedDuration". + - Duration: in case of traffic_type="fixedDuration", amount of seconds + to inject traffic. + - Rate: in frames per seconds or percentage. + - Type of rate: "framesPerSecond" ("bitsPerSecond" and + "percentLineRate" no used) + - Frame size: custom IMIX [1] definition; a list of packet size in + bytes and the weight. E.g.: + [[64, 64, 10], [128, 128, 15], [512, 512, 5]] + + [1] https://en.wikipedia.org/wiki/Internet_Mix + + :param traffic: list of traffic elements; each traffic element contains + the injection parameter for each flow group. + """ + for traffic_param in traffic.values(): + fg_id = str(traffic_param['id']) + config_element = self._get_config_element_by_flow_group_name(fg_id) + if not config_element: + raise exceptions.IxNetworkFlowNotPresent(flow_group=fg_id) + + type = traffic_param.get('traffic_type', 'fixedDuration') + duration = traffic_param.get('duration', 30) + rate = traffic_param['iload'] + weighted_range_pairs = self._parse_framesize( + traffic_param['outer_l2']['framesize']) + srcmac = str(traffic_param.get('srcmac', '00:00:00:00:00:01')) + dstmac = str(traffic_param.get('dstmac', '00:00:00:00:00:02')) + # NOTE(ralonsoh): add QinQ tagging when + # traffic_param['outer_l2']['QinQ'] exists. + # s_vlan = traffic_param['outer_l2']['QinQ']['S-VLAN'] + # c_vlan = traffic_param['outer_l2']['QinQ']['C-VLAN'] + + self.ixnet.setMultiAttribute( + config_element + '/transmissionControl', + '-type', type, '-duration', duration) + self.ixnet.setMultiAttribute( + config_element + '/frameRate', + '-rate', rate, '-type', 'framesPerSecond') + self.ixnet.setMultiAttribute( + config_element + '/frameSize', + '-type', 'weightedPairs', + '-weightedRangePairs', weighted_range_pairs) + self.ixnet.commit() + + self._update_frame_mac( + self._get_stack_item(fg_id, PROTO_ETHERNET)[0], + 'destinationAddress', dstmac) + self._update_frame_mac( + self._get_stack_item(fg_id, PROTO_ETHERNET)[0], + 'sourceAddress', srcmac) + + def _update_ipv4_address(self, ip_descriptor, field, ip_address, seed, + mask, count): + """Set the IPv4 address in a config element stack IP field + + :param ip_descriptor: (str) IP descriptor, e.g.: + /traffic/trafficItem:1/configElement:1/stack:"ipv4-2" + :param field: (str) field name, e.g.: scrIp, dstIp + :param ip_address: (str) IP address + :param seed: (int) seed length + :param mask: (str) IP address mask + :param count: (int) number of random IPs to generate + """ + field_descriptor = self._get_field_in_stack_item(ip_descriptor, + field) + self.ixnet.setMultiAttribute(field_descriptor, + '-seed', seed, + '-fixedBits', ip_address, + '-randomMask', mask, + '-valueType', 'random', + '-countValue', count) + self.ixnet.commit() + + def update_ip_packet(self, traffic): + """Update the IP packet + + NOTE: Only IPv4 is currently supported. + :param traffic: list of traffic elements; each traffic element contains + the injection parameter for each flow group. + """ + # NOTE(ralonsoh): L4 configuration is not set. + for traffic_param in traffic.values(): + fg_id = str(traffic_param['id']) + if not self._get_config_element_by_flow_group_name(fg_id): + raise exceptions.IxNetworkFlowNotPresent(flow_group=fg_id) + + count = traffic_param['outer_l3']['count'] + srcip4 = str(traffic_param['outer_l3']['srcip4']) + dstip4 = str(traffic_param['outer_l3']['dstip4']) + + self._update_ipv4_address( + self._get_stack_item(fg_id, PROTO_IPV4)[0], + 'srcIp', srcip4, 1, IP_VERSION_4_MASK, count) + self._update_ipv4_address( + self._get_stack_item(fg_id, PROTO_IPV4)[0], + 'dstIp', dstip4, 1, IP_VERSION_4_MASK, count) + + def _build_stats_map(self, view_obj, name_map): + return {data_yardstick: self.ixnet.execute( + 'getColumnValues', view_obj, data_ixia) + for data_yardstick, data_ixia in name_map.items()} + + def get_statistics(self): + """Retrieve port and flow statistics + + "Port Statistics" parameters are stored in self.PORT_STATS_NAME_MAP. + "Flow Statistics" parameters are stored in self.LATENCY_NAME_MAP. + + :return: dictionary with the statistics; the keys of this dictionary + are PORT_STATS_NAME_MAP and LATENCY_NAME_MAP keys. + """ + port_statistics = '::ixNet::OBJ-/statistics/view:"Port Statistics"' + flow_statistics = '::ixNet::OBJ-/statistics/view:"Flow Statistics"' + stats = self._build_stats_map(port_statistics, + self.PORT_STATS_NAME_MAP) + stats.update(self._build_stats_map(flow_statistics, + self.LATENCY_NAME_MAP)) + return stats + + def start_traffic(self): + """Start the traffic injection in the traffic item + + By configuration, there is only one traffic item. This function returns + when the traffic state is TRAFFIC_STATUS_STARTED. + """ + traffic_items = self.ixnet.getList('/traffic', 'trafficItem') + if self.is_traffic_running(): + self.ixnet.execute('stop', '/traffic') + # pylint: disable=unnecessary-lambda + utils.wait_until_true(lambda: self.is_traffic_stopped()) + + self.ixnet.execute('generate', traffic_items) + self.ixnet.execute('apply', '/traffic') + self.ixnet.execute('start', '/traffic') + # pylint: disable=unnecessary-lambda + utils.wait_until_true(lambda: self.is_traffic_running()) diff --git a/yardstick/network_services/nfvi/resource.py b/yardstick/network_services/nfvi/resource.py index 0c0bf223a..5922bd3b9 100644 --- a/yardstick/network_services/nfvi/resource.py +++ b/yardstick/network_services/nfvi/resource.py @@ -31,6 +31,7 @@ from yardstick.common.exceptions import ResourceCommandError from yardstick.common.task_template import finalize_for_yaml from yardstick.common.utils import validate_non_string_sequence from yardstick.network_services.nfvi.collectd import AmqpConsumer +from yardstick.benchmark.contexts import heat LOG = logging.getLogger(__name__) @@ -52,7 +53,8 @@ class ResourceProfile(object): DEFAULT_TIMEOUT = 3600 OVS_SOCKET_PATH = "/usr/local/var/run/openvswitch/db.sock" - def __init__(self, mgmt, port_names=None, plugins=None, interval=None, timeout=None): + def __init__(self, mgmt, port_names=None, plugins=None, + interval=None, timeout=None, reset_mq_flag=True): if plugins is None: self.plugins = {} @@ -77,6 +79,7 @@ class ResourceProfile(object): # we need to save mgmt so we can connect to port 5672 self.mgmt = mgmt self.connection = ssh.AutoConnectSSH.from_node(mgmt) + self._reset_mq_flag = reset_mq_flag @classmethod def make_from_node(cls, node, timeout): @@ -87,7 +90,10 @@ class ResourceProfile(object): plugins = collectd_options.get("plugins", {}) interval = collectd_options.get("interval") - return cls(node, plugins=plugins, interval=interval, timeout=timeout) + reset_mq_flag = (False if node.get("ctx_type") == heat.HeatContext.__context_type__ + else True) + return cls(node, plugins=plugins, interval=interval, + timeout=timeout, reset_mq_flag=reset_mq_flag) def check_if_system_agent_running(self, process): """ verify if system agent is running """ @@ -210,11 +216,14 @@ class ResourceProfile(object): if not self.enable: return {} + if self.check_if_system_agent_running("collectd")[0] != 0: + return {} + metric = {} while not self._queue.empty(): metric.update(self._queue.get()) - msg = self.parse_collectd_result(metric) - return msg + + return self.parse_collectd_result(metric) def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs): template = pkg_resources.resource_string("yardstick.network_services.nfvi", @@ -250,7 +259,7 @@ class ResourceProfile(object): if status != 0: LOG.error("cannot find OVS socket %s", socket_path) - def _start_rabbitmq(self, connection): + def _reset_rabbitmq(self, connection): # Reset amqp queue LOG.debug("reset and setup amqp to collect data from collectd") # ensure collectd.conf.d exists to avoid error/warning @@ -263,10 +272,37 @@ class ResourceProfile(object): "sudo rabbitmqctl authenticate_user admin admin", "sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'" ] + for cmd in cmd_list: - exit_status, stdout, stderr = connection.execute(cmd) - if exit_status != 0: - raise ResourceCommandError(command=cmd, stderr=stderr) + exit_status, _, stderr = connection.execute(cmd) + if exit_status != 0: + raise ResourceCommandError(command=cmd, stderr=stderr) + + def _check_rabbitmq_user(self, connection, user='admin'): + exit_status, stdout, _ = connection.execute("sudo rabbitmqctl list_users") + if exit_status == 0: + for line in stdout.split('\n')[1:]: + if line.split('\t')[0] == user: + return True + + def _set_rabbitmq_admin_user(self, connection): + LOG.debug("add admin user to amqp") + cmd_list = ["sudo rabbitmqctl add_user admin admin", + "sudo rabbitmqctl authenticate_user admin admin", + "sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'" + ] + + for cmd in cmd_list: + exit_status, stdout, stderr = connection.execute(cmd) + if exit_status != 0: + raise ResourceCommandError(command=cmd, stdout=stdout, stderr=stderr) + + def _start_rabbitmq(self, connection): + if self._reset_mq_flag: + self._reset_rabbitmq(connection) + else: + if not self._check_rabbitmq_user(connection): + self._set_rabbitmq_admin_user(connection) # check stdout for "sudo rabbitmqctl status" command cmd = "sudo rabbitmqctl status" @@ -282,10 +318,11 @@ class ResourceProfile(object): self._prepare_collectd_conf(config_file_path) connection.execute('sudo pkill -x -9 collectd') - exit_status = connection.execute("which %s > /dev/null 2>&1" % collectd_path)[0] + cmd = "which %s > /dev/null 2>&1" % collectd_path + exit_status, _, stderr = connection.execute(cmd) if exit_status != 0: - LOG.warning("%s is not present disabling", collectd_path) - return + raise ResourceCommandError(command=cmd, stderr=stderr) + if "ovs_stats" in self.plugins: self._setup_ovs_stats(connection) @@ -293,8 +330,12 @@ class ResourceProfile(object): LOG.debug("Start collectd service..... %s second timeout", self.timeout) # intel_pmu plug requires large numbers of files open, so try to set # ulimit -n to a large value - connection.execute("sudo bash -c 'ulimit -n 1000000 ; %s'" % collectd_path, - timeout=self.timeout) + + cmd = "sudo bash -c 'ulimit -n 1000000 ; %s'" % collectd_path + exit_status, _, stderr = connection.execute(cmd, timeout=self.timeout) + if exit_status != 0: + raise ResourceCommandError(command=cmd, stderr=stderr) + LOG.debug("Done") def initiate_systemagent(self, bin_path): @@ -334,5 +375,7 @@ class ResourceProfile(object): if pid: self.connection.execute('sudo kill -9 "%s"' % pid) self.connection.execute('sudo pkill -9 "%s"' % agent) - self.connection.execute('sudo service rabbitmq-server stop') - self.connection.execute("sudo rabbitmqctl stop_app") + + if self._reset_mq_flag: + self.connection.execute('sudo service rabbitmq-server stop') + self.connection.execute("sudo rabbitmqctl stop_app") diff --git a/yardstick/network_services/traffic_profile/base.py b/yardstick/network_services/traffic_profile/base.py index 162bab2bc..f4b5b178c 100644 --- a/yardstick/network_services/traffic_profile/base.py +++ b/yardstick/network_services/traffic_profile/base.py @@ -16,6 +16,31 @@ from yardstick.common import exceptions from yardstick.common import utils +class TrafficProfileConfig(object): + """Class to contain the TrafficProfile class information + + This object will parse and validate the traffic profile information. + """ + + DEFAULT_SCHEMA = 'nsb:traffic_profile:0.1' + DEFAULT_FRAME_RATE = 100 + DEFAULT_DURATION = 30 + + def __init__(self, tp_config): + self.schema = tp_config.get('schema', self.DEFAULT_SCHEMA) + self.name = tp_config.get('name') + self.description = tp_config.get('description') + tprofile = tp_config['traffic_profile'] + self.traffic_type = tprofile.get('traffic_type') + self.frame_rate = tprofile.get('frame_rate', self.DEFAULT_FRAME_RATE) + self.test_precision = tprofile.get('test_precision') + self.packet_sizes = tprofile.get('packet_sizes') + self.duration = tprofile.get('duration', self.DEFAULT_DURATION) + self.lower_bound = tprofile.get('lower_bound') + self.upper_bound = tprofile.get('upper_bound') + self.step_interval = tprofile.get('step_interval') + + class TrafficProfile(object): """ This class defines the behavior @@ -43,8 +68,9 @@ class TrafficProfile(object): # e.g. RFC2544 start_ip, stop_ip, drop_rate, # IMIX = {"10K": 0.1, "100M": 0.5} self.params = tp_config + self.config = TrafficProfileConfig(tp_config) - def execute_traffic(self, traffic_generator): + def execute_traffic(self, traffic_generator, **kawrgs): """ This methods defines the behavior of the traffic generator. It will be called in a loop until the traffic generator exits. diff --git a/yardstick/network_services/traffic_profile/http_ixload.py b/yardstick/network_services/traffic_profile/http_ixload.py index 348056551..6cbdb8ab2 100644 --- a/yardstick/network_services/traffic_profile/http_ixload.py +++ b/yardstick/network_services/traffic_profile/http_ixload.py @@ -12,9 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import absolute_import -from __future__ import print_function - import sys import os import logging @@ -27,22 +24,14 @@ try: except ImportError: import json as jsonutils - -class ErrorClass(object): - - def __init__(self, *args, **kwargs): - if 'test' not in kwargs: - raise RuntimeError - - def __getattr__(self, item): - raise AttributeError - +from yardstick.common import exceptions try: from IxLoad import IxLoad, StatCollectorUtils except ImportError: - IxLoad = ErrorClass - StatCollectorUtils = ErrorClass + IxLoad = exceptions.ErrorClass + StatCollectorUtils = exceptions.ErrorClass + LOG = logging.getLogger(__name__) CSV_FILEPATH_NAME = 'IxL_statResults.csv' @@ -93,7 +82,7 @@ def validate_non_string_sequence(value, default=None, raise_exc=None): if isinstance(value, collections.Sequence) and not isinstance(value, str): return value if raise_exc: - raise raise_exc + raise raise_exc # pylint: disable=raising-bad-type return default @@ -218,7 +207,7 @@ class IXLOADHttpTest(object): # ---- Remap ports ---- try: self.reassign_ports(test, repository, self.ports_to_reassign) - except Exception: + except Exception: # pylint: disable=broad-except LOG.exception("Exception occurred during reassign_ports") # ----------------------------------------------------------------------- diff --git a/yardstick/network_services/traffic_profile/ixia_rfc2544.py b/yardstick/network_services/traffic_profile/ixia_rfc2544.py index 7f047226b..39336785e 100644 --- a/yardstick/network_services/traffic_profile/ixia_rfc2544.py +++ b/yardstick/network_services/traffic_profile/ixia_rfc2544.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import absolute_import import logging from yardstick.network_services.traffic_profile.trex_traffic_profile import \ @@ -26,6 +25,10 @@ class IXIARFC2544Profile(TrexProfile): UPLINK = 'uplink' DOWNLINK = 'downlink' + def __init__(self, yaml_data): + super(IXIARFC2544Profile, self).__init__(yaml_data) + self.rate = self.config.frame_rate + def _get_ixia_traffic_profile(self, profile_data, mac=None): if mac is None: mac = {} @@ -82,13 +85,10 @@ class IXIARFC2544Profile(TrexProfile): def _ixia_traffic_generate(self, traffic, ixia_obj): for key, value in traffic.items(): if key.startswith((self.UPLINK, self.DOWNLINK)): - value["iload"] = str(self.rate) - ixia_obj.ix_update_frame(traffic) - ixia_obj.ix_update_ether(traffic) - ixia_obj.add_ip_header(traffic, 4) - ixia_obj.ix_start_traffic() - self.tmp_drop = 0 - self.tmp_throughput = 0 + value['iload'] = str(self.rate) + ixia_obj.update_frame(traffic) + ixia_obj.update_ip_packet(traffic) + ixia_obj.start_traffic() def update_traffic_profile(self, traffic_generator): def port_generator(): @@ -99,85 +99,65 @@ class IXIARFC2544Profile(TrexProfile): if not profile_data: continue self.profile_data = profile_data - self.get_streams(self.profile_data) self.full_profile.update({vld_id: self.profile_data}) for intf in intfs: yield traffic_generator.vnfd_helper.port_num(intf) self.ports = [port for port in port_generator()] - def execute_traffic(self, traffic_generator, ixia_obj, mac=None): - if mac is None: - mac = {} + def execute_traffic(self, traffic_generator, ixia_obj=None, mac=None): + mac = {} if mac is None else mac + first_run = self.first_run if self.first_run: + self.first_run = False self.full_profile = {} self.pg_id = 0 self.update_traffic_profile(traffic_generator) - traffic = \ - self._get_ixia_traffic_profile(self.full_profile, mac) self.max_rate = self.rate self.min_rate = 0 - self.get_multiplier() - self._ixia_traffic_generate(traffic, ixia_obj) - - def get_multiplier(self): - self.rate = round((self.max_rate + self.min_rate) / 2.0, 2) - multiplier = round(self.rate / self.pps, 2) - return str(multiplier) + else: + self.rate = round(float(self.max_rate + self.min_rate) / 2.0, 2) - def start_ixia_latency(self, traffic_generator, ixia_obj, mac=None): - if mac is None: - mac = {} - self.update_traffic_profile(traffic_generator) - traffic = \ - self._get_ixia_traffic_profile(self.full_profile, mac) + traffic = self._get_ixia_traffic_profile(self.full_profile, mac) self._ixia_traffic_generate(traffic, ixia_obj) + return first_run - def get_drop_percentage(self, samples, tol_min, tolerance, ixia_obj, - mac=None): - if mac is None: - mac = {} - status = 'Running' + def get_drop_percentage(self, samples, tol_min, tolerance, duration=30.0, + first_run=False): + completed = False drop_percent = 100 - in_packets = sum([samples[iface]['in_packets'] for iface in samples]) - out_packets = sum([samples[iface]['out_packets'] for iface in samples]) - rx_throughput = \ - sum([samples[iface]['RxThroughput'] for iface in samples]) - tx_throughput = \ - sum([samples[iface]['TxThroughput'] for iface in samples]) - packet_drop = abs(out_packets - in_packets) + num_ifaces = len(samples) + in_packets_sum = sum( + [samples[iface]['in_packets'] for iface in samples]) + out_packets_sum = sum( + [samples[iface]['out_packets'] for iface in samples]) + rx_throughput = sum( + [samples[iface]['RxThroughput'] for iface in samples]) + rx_throughput = round(float(rx_throughput), 2) + tx_throughput = sum( + [samples[iface]['TxThroughput'] for iface in samples]) + tx_throughput = round(float(tx_throughput), 2) + packet_drop = abs(out_packets_sum - in_packets_sum) + try: - drop_percent = round((packet_drop / float(out_packets)) * 100, 2) + drop_percent = round( + (packet_drop / float(out_packets_sum)) * 100, 2) except ZeroDivisionError: LOG.info('No traffic is flowing') - samples['TxThroughput'] = round(tx_throughput / 1.0, 2) - samples['RxThroughput'] = round(rx_throughput / 1.0, 2) - samples['CurrentDropPercentage'] = drop_percent - samples['Throughput'] = self.tmp_throughput - samples['DropPercentage'] = self.tmp_drop - if drop_percent > tolerance and self.tmp_throughput == 0: - samples['Throughput'] = round(rx_throughput / 1.0, 2) - samples['DropPercentage'] = drop_percent - if self.first_run: - max_supported_rate = out_packets / 30.0 - self.rate = max_supported_rate - self.first_run = False - if drop_percent <= tolerance: - status = 'Completed' + + samples['TxThroughput'] = tx_throughput + samples['RxThroughput'] = rx_throughput + samples['DropPercentage'] = drop_percent + + if first_run: + self.rate = out_packets_sum / duration / num_ifaces + completed = True if drop_percent <= tolerance else False + if drop_percent > tolerance: self.max_rate = self.rate elif drop_percent < tol_min: self.min_rate = self.rate - if drop_percent >= self.tmp_drop: - self.tmp_drop = drop_percent - self.tmp_throughput = round((rx_throughput / 1.0), 2) - samples['Throughput'] = round(rx_throughput / 1.0, 2) - samples['DropPercentage'] = drop_percent else: - samples['Throughput'] = round(rx_throughput / 1.0, 2) - samples['DropPercentage'] = drop_percent - return status, samples - self.get_multiplier() - traffic = self._get_ixia_traffic_profile(self.full_profile, mac) - self._ixia_traffic_generate(traffic, ixia_obj) - return status, samples + completed = True + + return completed, samples diff --git a/yardstick/network_services/traffic_profile/prox_binsearch.py b/yardstick/network_services/traffic_profile/prox_binsearch.py index 1e926aca2..9457096c8 100644 --- a/yardstick/network_services/traffic_profile/prox_binsearch.py +++ b/yardstick/network_services/traffic_profile/prox_binsearch.py @@ -21,6 +21,7 @@ import time from yardstick.network_services.traffic_profile.prox_profile import ProxProfile from yardstick.network_services import constants +from yardstick.common import constants as overall_constants LOG = logging.getLogger(__name__) @@ -84,9 +85,14 @@ class ProxBinSearchProfile(ProxProfile): # success, the binary search will complete on an integer multiple # of the precision, rather than on a fraction of it. - theor_max_thruput = 0 + theor_max_thruput = actual_max_thruput = 0 result_samples = {} + rate_samples = {} + pos_retry = 0 + neg_retry = 0 + total_retry = 0 + ok_retry = 0 # Store one time only value in influxdb single_samples = { @@ -102,51 +108,91 @@ class ProxBinSearchProfile(ProxProfile): successful_pkt_loss = 0.0 line_speed = traffic_gen.scenario_helper.all_options.get( "interface_speed_gbps", constants.NIC_GBPS_DEFAULT) * constants.ONE_GIGABIT_IN_BITS - for test_value in self.bounds_iterator(LOG): - result, port_samples = self._profile_helper.run_test(pkt_size, duration, - test_value, - self.tolerated_loss, - line_speed) - self.curr_time = time.time() - diff_time = self.curr_time - self.prev_time - self.prev_time = self.curr_time - - if result.success: - LOG.debug("Success! Increasing lower bound") - self.current_lower = test_value - successful_pkt_loss = result.pkt_loss - samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) - - # store results with success tag in influxdb - success_samples = {'Success_' + key: value for key, value in samples.items()} - - success_samples["Success_rx_total"] = int(result.rx_total / diff_time) - success_samples["Success_tx_total"] = int(result.tx_total / diff_time) - success_samples["Success_can_be_lost"] = int(result.can_be_lost / diff_time) - success_samples["Success_drop_total"] = int(result.drop_total / diff_time) - self.queue.put(success_samples) - - # Store Actual throughput for result samples - result_samples["Result_Actual_throughput"] = \ - success_samples["Success_RxThroughput"] - else: - LOG.debug("Failure... Decreasing upper bound") - self.current_upper = test_value - samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) - - for k in samples: - tmp = samples[k] - if isinstance(tmp, dict): - for k2 in tmp: - samples[k][k2] = int(samples[k][k2] / diff_time) - - if theor_max_thruput < samples["TxThroughput"]: - theor_max_thruput = samples['TxThroughput'] - self.queue.put({'theor_max_throughput': theor_max_thruput}) - - LOG.debug("Collect TG KPIs %s %s", datetime.datetime.now(), samples) - self.queue.put(samples) + ok_retry = traffic_gen.scenario_helper.scenario_cfg["runner"].get("confirmation", 0) + for test_value in self.bounds_iterator(LOG): + pos_retry = 0 + neg_retry = 0 + total_retry = 0 + + rate_samples["MAX_Rate"] = self.current_upper + rate_samples["MIN_Rate"] = self.current_lower + rate_samples["Test_Rate"] = test_value + self.queue.put(rate_samples, True, overall_constants.QUEUE_PUT_TIMEOUT) + LOG.info("Checking MAX %s MIN %s TEST %s", + self.current_upper, self.lower_bound, test_value) + while (pos_retry <= ok_retry) and (neg_retry <= ok_retry): + + total_retry = total_retry + 1 + result, port_samples = self._profile_helper.run_test(pkt_size, duration, + test_value, + self.tolerated_loss, + line_speed) + if (total_retry > (ok_retry * 3)) and (ok_retry is not 0): + LOG.info("Failure.!! .. RETRY EXCEEDED ... decrease lower bound") + + successful_pkt_loss = result.pkt_loss + samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) + + self.current_upper = test_value + neg_retry = total_retry + elif result.success: + if (pos_retry < ok_retry) and (ok_retry is not 0): + neg_retry = 0 + LOG.info("Success! ... confirm retry") + + successful_pkt_loss = result.pkt_loss + samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) + + else: + LOG.info("Success! Increasing lower bound") + self.current_lower = test_value + + successful_pkt_loss = result.pkt_loss + samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) + + # store results with success tag in influxdb + success_samples = \ + {'Success_' + key: value for key, value in samples.items()} + + success_samples["Success_rx_total"] = int(result.rx_total) + success_samples["Success_tx_total"] = int(result.tx_total) + success_samples["Success_can_be_lost"] = int(result.can_be_lost) + success_samples["Success_drop_total"] = int(result.drop_total) + success_samples["Success_RxThroughput"] = samples["RxThroughput"] + success_samples["Success_RxThroughput_gbps"] = \ + (samples["RxThroughput"] / 1000) * ((pkt_size + 20)* 8) + LOG.info(">>>##>>Collect SUCCESS TG KPIs %s %s", + datetime.datetime.now(), success_samples) + self.queue.put(success_samples, True, overall_constants.QUEUE_PUT_TIMEOUT) + + # Store Actual throughput for result samples + actual_max_thruput = success_samples["Success_RxThroughput"] + + pos_retry = pos_retry + 1 + + else: + if (neg_retry < ok_retry) and (ok_retry is not 0): + + pos_retry = 0 + LOG.info("failure! ... confirm retry") + else: + LOG.info("Failure... Decreasing upper bound") + self.current_upper = test_value + + neg_retry = neg_retry + 1 + samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) + + if theor_max_thruput < samples["TxThroughput"]: + theor_max_thruput = samples['TxThroughput'] + self.queue.put({'theor_max_throughput': theor_max_thruput}) + + LOG.info(">>>##>>Collect TG KPIs %s %s", datetime.datetime.now(), samples) + self.queue.put(samples, True, overall_constants.QUEUE_PUT_TIMEOUT) + + LOG.info(">>>##>> Result Reached PktSize %s Theor_Max_Thruput %s Actual_throughput %s", + pkt_size, theor_max_thruput, actual_max_thruput) result_samples["Result_pktSize"] = pkt_size - result_samples["Result_theor_max_throughput"] = theor_max_thruput/(1000 * 1000) + result_samples["Result_theor_max_throughput"] = theor_max_thruput + result_samples["Result_Actual_throughput"] = actual_max_thruput self.queue.put(result_samples) diff --git a/yardstick/network_services/traffic_profile/rfc2544.py b/yardstick/network_services/traffic_profile/rfc2544.py index 83020c85c..c24e2f65a 100644 --- a/yardstick/network_services/traffic_profile/rfc2544.py +++ b/yardstick/network_services/traffic_profile/rfc2544.py @@ -11,190 +11,288 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -""" RFC2544 Throughput implemenation """ -from __future__ import absolute_import -from __future__ import division import logging -from trex_stl_lib.trex_stl_client import STLStream -from trex_stl_lib.trex_stl_streams import STLFlowLatencyStats -from trex_stl_lib.trex_stl_streams import STLTXCont +from trex_stl_lib import api as Pkt +from trex_stl_lib import trex_stl_client +from trex_stl_lib import trex_stl_packet_builder_scapy +from trex_stl_lib import trex_stl_streams + +from yardstick.network_services.traffic_profile import trex_traffic_profile -from yardstick.network_services.traffic_profile.trex_traffic_profile \ - import TrexProfile LOGGING = logging.getLogger(__name__) +SRC_PORT = 'sport' +DST_PORT = 'dport' + + +class PortPgIDMap(object): + """Port and pg_id mapping class + + "pg_id" is the identification STL library gives to each stream. In the + RFC2544Profile class, the traffic has a STLProfile per port, which contains + one or several streams, one per packet size defined in the IMIX test case + description. + + Example of port <-> pg_id map: + self._port_pg_id_map = { + 0: [1, 2, 3, 4], + 1: [5, 6, 7, 8] + } + """ + + def __init__(self): + self._pg_id = 0 + self._last_port = None + self._port_pg_id_map = {} + + def add_port(self, port): + self._last_port = port + self._port_pg_id_map[port] = [] + + def get_pg_ids(self, port): + return self._port_pg_id_map.get(port) + + def increase_pg_id(self, port=None): + port = self._last_port if not port else port + if port is None: + return + pg_id_list = self._port_pg_id_map.get(port) + if not pg_id_list: + self.add_port(port) + pg_id_list = self._port_pg_id_map[port] + self._pg_id += 1 + pg_id_list.append(self._pg_id) + return self._pg_id -class RFC2544Profile(TrexProfile): - """ This class handles rfc2544 implemenation. """ +class RFC2544Profile(trex_traffic_profile.TrexProfile): + """TRex RFC2544 traffic profile""" + + TOLERANCE_LIMIT = 0.05 def __init__(self, traffic_generator): super(RFC2544Profile, self).__init__(traffic_generator) self.generator = None - self.max_rate = None - self.min_rate = None - self.ports = None - self.rate = 100 - self.drop_percent_at_max_tx = None - self.throughput_max = None + self.rate = self.config.frame_rate + self.max_rate = self.config.frame_rate + self.min_rate = 0 + self.drop_percent_max = 0 def register_generator(self, generator): self.generator = generator - def execute_traffic(self, traffic_generator=None): - """ Generate the stream and run traffic on the given ports """ + def stop_traffic(self, traffic_generator=None): + """"Stop traffic injection, reset counters and remove streams""" if traffic_generator is not None and self.generator is None: self.generator = traffic_generator - if self.ports is not None: - return + self.generator.client.stop() + self.generator.client.reset() + self.generator.client.remove_all_streams() + + def execute_traffic(self, traffic_generator=None): + """Generate the stream and run traffic on the given ports + + :param traffic_generator: (TrexTrafficGenRFC) traffic generator + :return ports: (list of int) indexes of ports + port_pg_id: (dict) port indexes and pg_id [1] map + [1] https://trex-tgn.cisco.com/trex/doc/cp_stl_docs/api/ + profile_code.html#stlstream-modes + """ + if traffic_generator is not None and self.generator is None: + self.generator = traffic_generator - self.ports = [] + port_pg_id = PortPgIDMap() + ports = [] for vld_id, intfs in sorted(self.generator.networks.items()): profile_data = self.params.get(vld_id) - # no profile for this port if not profile_data: continue - # correlated traffic doesn't use public traffic? - if vld_id.startswith(self.DOWNLINK) and \ - self.generator.rfc2544_helper.correlated_traffic: + if (vld_id.startswith(self.DOWNLINK) and + self.generator.rfc2544_helper.correlated_traffic): continue for intf in intfs: - port = self.generator.port_num(intf) - self.ports.append(port) - self.generator.client.add_streams(self.get_streams(profile_data), ports=port) - - self.max_rate = self.rate - self.min_rate = 0 - self.generator.client.start(ports=self.ports, mult=self.get_multiplier(), - duration=30, force=True) - self.drop_percent_at_max_tx = 0 - self.throughput_max = 0 - - def get_multiplier(self): - """ Get the rate at which next iteration to run """ - self.rate = round((self.max_rate + self.min_rate) / 2.0, 2) - multiplier = round(self.rate / self.pps, 2) - return str(multiplier) - - def get_drop_percentage(self, generator=None): - """ Calculate the drop percentage and run the traffic """ - if generator is None: - generator = self.generator - run_duration = self.generator.RUN_DURATION - samples = self.generator.generate_samples(self.ports) - - in_packets = sum([value['in_packets'] for value in samples.values()]) - out_packets = sum([value['out_packets'] for value in samples.values()]) - - packet_drop = abs(out_packets - in_packets) - drop_percent = 100.0 - try: - drop_percent = round((packet_drop / float(out_packets)) * 100, 5) - except ZeroDivisionError: - LOGGING.info('No traffic is flowing') + port_num = int(self.generator.port_num(intf)) + ports.append(port_num) + port_pg_id.add_port(port_num) + profile = self._create_profile(profile_data, + self.rate, port_pg_id) + self.generator.client.add_streams(profile, ports=[port_num]) + + self.generator.client.start(ports=ports, + duration=self.config.duration, + force=True) + return ports, port_pg_id + + def _create_profile(self, profile_data, rate, port_pg_id): + """Create a STL profile (list of streams) for a port""" + streams = [] + for packet_name in profile_data: + imix = (profile_data[packet_name]. + get('outer_l2', {}).get('framesize')) + imix_data = self._create_imix_data(imix) + self._create_vm(profile_data[packet_name]) + _streams = self._create_streams(imix_data, rate, port_pg_id) + streams.extend(_streams) + return trex_stl_streams.STLProfile(streams) + + def _create_imix_data(self, imix): + """Generate the IMIX distribution for a STL profile + + The input information is the framesize dictionary in a test case + traffic profile definition. E.g.: + downlink_0: + ipv4: + id: 2 + outer_l2: + framesize: + 64B: 10 + 128B: 20 + ... + + This function normalizes the sum of framesize weights to 100 and + returns a dictionary of frame sizes in bytes and weight in percentage. + E.g.: + imix_count = {64: 25, 128: 75} + + :param imix: (dict) IMIX size and weight + """ + imix_count = {} + if not imix: + return imix_count + + imix_count = {size.upper().replace('B', ''): int(weight) + for size, weight in imix.items()} + imix_sum = sum(imix_count.values()) + if imix_sum <= 0: + imix_count = {64: 100} + imix_sum = 100 + + weight_normalize = float(imix_sum) / 100 + return {size: float(weight) / weight_normalize + for size, weight in imix_count.items()} + + def _create_vm(self, packet_definition): + """Create the STL Raw instructions""" + self.ether_packet = Pkt.Ether() + self.ip_packet = Pkt.IP() + self.ip6_packet = None + self.udp_packet = Pkt.UDP() + self.udp[DST_PORT] = 'UDP.dport' + self.udp[SRC_PORT] = 'UDP.sport' + self.qinq = False + self.vm_flow_vars = [] + outer_l2 = packet_definition.get('outer_l2') + outer_l3v4 = packet_definition.get('outer_l3v4') + outer_l3v6 = packet_definition.get('outer_l3v6') + outer_l4 = packet_definition.get('outer_l4') + if outer_l2: + self._set_outer_l2_fields(outer_l2) + if outer_l3v4: + self._set_outer_l3v4_fields(outer_l3v4) + if outer_l3v6: + self._set_outer_l3v6_fields(outer_l3v6) + if outer_l4: + self._set_outer_l4_fields(outer_l4) + self.trex_vm = trex_stl_packet_builder_scapy.STLScVmRaw( + self.vm_flow_vars) + + def _create_single_packet(self, size=64): + size -= 4 + ether_packet = self.ether_packet + ip_packet = self.ip6_packet if self.ip6_packet else self.ip_packet + udp_packet = self.udp_packet + if self.qinq: + qinq_packet = self.qinq_packet + base_pkt = ether_packet / qinq_packet / ip_packet / udp_packet + else: + base_pkt = ether_packet / ip_packet / udp_packet + pad = max(0, size - len(base_pkt)) * 'x' + return trex_stl_packet_builder_scapy.STLPktBuilder( + pkt=base_pkt / pad, vm=self.trex_vm) + + def _create_streams(self, imix_data, rate, port_pg_id): + """Create a list of streams per packet size + + The STL TX mode speed of the generated streams will depend on the frame + weight and the frame rate. Both the frame weight and the total frame + rate are normalized to 100. The STL TX mode speed, defined in + percentage, is the combitation of both percentages. E.g.: + frame weight = 100 + rate = 90 + --> STLTXmode percentage = 10 (%) + + frame weight = 80 + rate = 50 + --> STLTXmode percentage = 40 (%) + + :param imix_data: (dict) IMIX size and weight + :param rate: (float) normalized [0..100] total weight + :param pg_id: (PortPgIDMap) port / pg_id (list) map + """ + streams = [] + for size, weight in ((int(size), float(weight)) for (size, weight) + in imix_data.items() if float(weight) > 0): + packet = self._create_single_packet(size) + pg_id = port_pg_id.increase_pg_id() + stl_flow = trex_stl_streams.STLFlowLatencyStats(pg_id=pg_id) + mode = trex_stl_streams.STLTXCont(percentage=weight * rate / 100) + streams.append(trex_stl_client.STLStream( + packet=packet, flow_stats=stl_flow, mode=mode)) + return streams + + def get_drop_percentage(self, samples, tol_low, tol_high, + correlated_traffic): + """Calculate the drop percentage and run the traffic""" + tx_rate_fps = 0 + rx_rate_fps = 0 + for sample in samples: + tx_rate_fps += sum( + port['tx_throughput_fps'] for port in sample.values()) + rx_rate_fps += sum( + port['rx_throughput_fps'] for port in sample.values()) + tx_rate_fps = round(float(tx_rate_fps) / len(samples), 2) + rx_rate_fps = round(float(rx_rate_fps) / len(samples), 2) # TODO(esm): RFC2544 doesn't tolerate packet loss, why do we? - tolerance_low = generator.rfc2544_helper.tolerance_low - tolerance_high = generator.rfc2544_helper.tolerance_high - - tx_rate = out_packets / run_duration - rx_rate = in_packets / run_duration - - throughput_max = self.throughput_max - drop_percent_at_max_tx = self.drop_percent_at_max_tx + out_packets = sum(port['out_packets'] for port in samples[-1].values()) + in_packets = sum(port['in_packets'] for port in samples[-1].values()) + drop_percent = 100.0 - if self.drop_percent_at_max_tx is None: - self.rate = tx_rate - self.first_run = False + # https://tools.ietf.org/html/rfc2544#section-26.3 + if out_packets: + drop_percent = round( + (float(abs(out_packets - in_packets)) / out_packets) * 100, 5) - if drop_percent > tolerance_high: - # TODO(esm): why don't we discard results that are out of tolerance? + tol_high = tol_high if tol_high > self.TOLERANCE_LIMIT else tol_high + tol_low = tol_low if tol_low > self.TOLERANCE_LIMIT else tol_low + if drop_percent > tol_high: self.max_rate = self.rate - if throughput_max == 0: - throughput_max = rx_rate - drop_percent_at_max_tx = drop_percent - - elif drop_percent >= tolerance_low: - # TODO(esm): why do we update the samples dict in this case - # and not update our tracking values? - throughput_max = rx_rate - drop_percent_at_max_tx = drop_percent - - elif drop_percent >= self.drop_percent_at_max_tx: - # TODO(esm): why don't we discard results that are out of tolerance? + elif drop_percent < tol_low: self.min_rate = self.rate - self.drop_percent_at_max_tx = drop_percent_at_max_tx = drop_percent - self.throughput_max = throughput_max = rx_rate + # else: + # NOTE(ralonsoh): the test should finish here + # pass + last_rate = self.rate + self.rate = round(float(self.max_rate + self.min_rate) / 2.0, 5) - else: - # TODO(esm): why don't we discard results that are out of tolerance? - self.min_rate = self.rate + throughput = rx_rate_fps * 2 if correlated_traffic else rx_rate_fps - generator.clear_client_stats(self.ports) - generator.start_client(self.ports, mult=self.get_multiplier(), - duration=run_duration, force=True) + if drop_percent > self.drop_percent_max: + self.drop_percent_max = drop_percent - # if correlated traffic update the Throughput - if generator.rfc2544_helper.correlated_traffic: - throughput_max *= 2 + latency = {port_num: value['latency'] + for port_num, value in samples[-1].items()} - samples.update({ - 'TxThroughput': tx_rate, - 'RxThroughput': rx_rate, + output = { + 'TxThroughput': tx_rate_fps, + 'RxThroughput': rx_rate_fps, 'CurrentDropPercentage': drop_percent, - 'Throughput': throughput_max, - 'DropPercentage': drop_percent_at_max_tx, - }) - - return samples - - def execute_latency(self, generator=None, samples=None): - if generator is not None and self.generator is None: - self.generator = generator - - if samples is None: - samples = self.generator.generate_samples() - - self.pps, multiplier = self.calculate_pps(samples) - self.ports = [] - self.pg_id = self.params['traffic_profile'].get('pg_id', 1) - for vld_id, intfs in sorted(self.generator.networks.items()): - profile_data = self.params.get(vld_id) - if not profile_data: - continue - # correlated traffic doesn't use public traffic? - if vld_id.startswith(self.DOWNLINK) and \ - self.generator.rfc2544_helper.correlated_traffic: - continue - for intf in intfs: - port = self.generator.port_num(intf) - self.ports.append(port) - self.generator.client.add_streams(self.get_streams(profile_data), ports=port) - - self.generator.start_client(ports=self.ports, mult=str(multiplier), - duration=120, force=True) - self.first_run = False - - def calculate_pps(self, samples): - pps = round(samples['Throughput'] / 2, 2) - multiplier = round(self.rate / self.pps, 2) - return pps, multiplier - - def create_single_stream(self, packet_size, pps, isg=0): - packet = self._create_single_packet(packet_size) - if pps: - stl_mode = STLTXCont(pps=pps) - else: - stl_mode = STLTXCont(pps=self.pps) - if self.pg_id: - LOGGING.debug("pg_id: %s", self.pg_id) - stl_flow_stats = STLFlowLatencyStats(pg_id=self.pg_id) - stream = STLStream(isg=isg, packet=packet, mode=stl_mode, - flow_stats=stl_flow_stats) - self.pg_id += 1 - else: - stream = STLStream(isg=isg, packet=packet, mode=stl_mode) - return stream + 'Throughput': throughput, + 'DropPercentage': self.drop_percent_max, + 'Rate': last_rate, + 'Latency': latency + } + return output diff --git a/yardstick/network_services/traffic_profile/trex_traffic_profile.py b/yardstick/network_services/traffic_profile/trex_traffic_profile.py index f5e3923d5..ed0355fa5 100644 --- a/yardstick/network_services/traffic_profile/trex_traffic_profile.py +++ b/yardstick/network_services/traffic_profile/trex_traffic_profile.py @@ -19,21 +19,16 @@ from random import SystemRandom import ipaddress import six - -from yardstick.common import exceptions as y_exc -from yardstick.network_services.traffic_profile import base -from trex_stl_lib.trex_stl_client import STLStream -from trex_stl_lib.trex_stl_streams import STLFlowLatencyStats -from trex_stl_lib.trex_stl_streams import STLTXCont -from trex_stl_lib.trex_stl_streams import STLProfile from trex_stl_lib.trex_stl_packet_builder_scapy import STLVmWrFlowVar from trex_stl_lib.trex_stl_packet_builder_scapy import STLVmFlowVarRepeatableRandom from trex_stl_lib.trex_stl_packet_builder_scapy import STLVmFlowVar -from trex_stl_lib.trex_stl_packet_builder_scapy import STLPktBuilder -from trex_stl_lib.trex_stl_packet_builder_scapy import STLScVmRaw from trex_stl_lib.trex_stl_packet_builder_scapy import STLVmFixIpv4 from trex_stl_lib import api as Pkt +from yardstick.common import exceptions as y_exc +from yardstick.network_services.traffic_profile import base + + SRC = 'src' DST = 'dst' ETHERNET = 'Ethernet' @@ -342,115 +337,6 @@ class TrexProfile(base.TrafficProfile): if 'dstport' in outer_l4: self._set_proto_addr(UDP, DST_PORT, outer_l4['dstport'], outer_l4['count']) - def generate_imix_data(self, packet_definition): - """ generate packet size for a given traffic profile """ - imix_count = {} - imix_data = {} - if not packet_definition: - return imix_count - imix = packet_definition.get('framesize') - if imix: - for size in imix: - data = imix[size] - imix_data[int(size[:-1])] = int(data) - imix_sum = sum(imix_data.values()) - if imix_sum > 100: - raise SystemExit("Error in IMIX data") - elif imix_sum < 100: - imix_data[64] = imix_data.get(64, 0) + (100 - imix_sum) - - avg_size = 0.0 - for size in imix_data: - count = int(imix_data[size]) - if count: - avg_size += round(size * count / 100, 2) - pps = round(self.pps * count / 100, 0) - imix_count[size] = pps - self.rate = round(1342177280 / avg_size, 0) * 2 - logging.debug("Imax: %s rate: %s", imix_count, self.rate) - return imix_count - - def get_streams(self, profile_data): - """ generate trex stream - :param profile_data: - :type profile_data: - """ - self.streams = [] - self.pps = self.params['traffic_profile'].get('frame_rate', 100) - for packet_name in profile_data: - outer_l2 = profile_data[packet_name].get('outer_l2') - imix_data = self.generate_imix_data(outer_l2) - if not imix_data: - imix_data = {64: self.pps} - self.generate_vm(profile_data[packet_name]) - for size in imix_data: - self._generate_streams(size, imix_data[size]) - self._generate_profile() - return self.profile - - def generate_vm(self, packet_definition): - """ generate trex vm with flows setup """ - self.ether_packet = Pkt.Ether() - self.ip_packet = Pkt.IP() - self.ip6_packet = None - self.udp_packet = Pkt.UDP() - self.udp[DST_PORT] = 'UDP.dport' - self.udp[SRC_PORT] = 'UDP.sport' - self.qinq = False - self.vm_flow_vars = [] - outer_l2 = packet_definition.get('outer_l2', None) - outer_l3v4 = packet_definition.get('outer_l3v4', None) - outer_l3v6 = packet_definition.get('outer_l3v6', None) - outer_l4 = packet_definition.get('outer_l4', None) - if outer_l2: - self._set_outer_l2_fields(outer_l2) - if outer_l3v4: - self._set_outer_l3v4_fields(outer_l3v4) - if outer_l3v6: - self._set_outer_l3v6_fields(outer_l3v6) - if outer_l4: - self._set_outer_l4_fields(outer_l4) - self.trex_vm = STLScVmRaw(self.vm_flow_vars) - - def generate_packets(self): - """ generate packets from trex TG """ - base_pkt = self.base_pkt - size = self.fsize - 4 - pad = max(0, size - len(base_pkt)) * 'x' - self.packets = [STLPktBuilder(pkt=base_pkt / pad, - vm=vm) for vm in self.vms] - - def _create_single_packet(self, size=64): - size = size - 4 - ether_packet = self.ether_packet - ip_packet = self.ip6_packet if self.ip6_packet else self.ip_packet - udp_packet = self.udp_packet - if self.qinq: - qinq_packet = self.qinq_packet - base_pkt = ether_packet / qinq_packet / ip_packet / udp_packet - else: - base_pkt = ether_packet / ip_packet / udp_packet - pad = max(0, size - len(base_pkt)) * 'x' - packet = STLPktBuilder(pkt=base_pkt / pad, vm=self.trex_vm) - return packet - - def _create_single_stream(self, packet_size, pps, isg=0): - packet = self._create_single_packet(packet_size) - if self.pg_id: - self.pg_id += 1 - stl_flow = STLFlowLatencyStats(pg_id=self.pg_id) - stream = STLStream(isg=isg, packet=packet, mode=STLTXCont(pps=pps), - flow_stats=stl_flow) - else: - stream = STLStream(isg=isg, packet=packet, mode=STLTXCont(pps=pps)) - return stream - - def _generate_streams(self, packet_size, pps): - self.streams.append(self._create_single_stream(packet_size, pps)) - - def _generate_profile(self): - self.profile = STLProfile(self.streams) - @classmethod def _count_ip(cls, start_ip, end_ip): start = ipaddress.ip_address(six.u(start_ip)) diff --git a/yardstick/network_services/vnf_generic/vnf/acl_vnf.py b/yardstick/network_services/vnf_generic/vnf/acl_vnf.py index d9719eb4e..8e9bc87e1 100644 --- a/yardstick/network_services/vnf_generic/vnf/acl_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/acl_vnf.py @@ -13,10 +13,14 @@ # limitations under the License. import logging - +import ipaddress +import six from yardstick.common import utils +from yardstick.common import exceptions + from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper -from yardstick.network_services.yang_model import YangModel +from yardstick.network_services.helpers.samplevnf_helper import PortPairs +from itertools import chain LOG = logging.getLogger(__name__) @@ -38,6 +42,196 @@ class AclApproxSetupEnvSetupEnvHelper(DpdkVnfSetupEnvHelper): SW_DEFAULT_CORE = 5 DEFAULT_CONFIG_TPL_CFG = "acl.cfg" VNF_TYPE = "ACL" + RULE_CMD = "acl" + + DEFAULT_PRIORITY = 1 + DEFAULT_PROTOCOL = 0 + DEFAULT_PROTOCOL_MASK = 0 + # Default actions to be applied to SampleVNF. Please note, + # that this list is extended with `fwd` action when default + # actions are generated. + DEFAULT_FWD_ACTIONS = ["accept", "count"] + + def __init__(self, vnfd_helper, ssh_helper, scenario_helper): + super(AclApproxSetupEnvSetupEnvHelper, self).__init__(vnfd_helper, + ssh_helper, + scenario_helper) + self._action_id = 0 + + def get_ip_from_port(self, port): + # we can't use gateway because in OpenStack gateways interfere with floating ip routing + # return self.make_ip_addr(self.get_ports_gateway(port), self.get_netmask_gateway(port)) + vintf = self.vnfd_helper.find_interface(name=port)["virtual-interface"] + return utils.make_ip_addr(vintf["local_ip"], vintf["netmask"]) + + def get_network_and_prefixlen_from_ip_of_port(self, port): + ip_addr = self.get_ip_from_port(port) + # handle cases with no gateway + if ip_addr: + return ip_addr.network.network_address.exploded, ip_addr.network.prefixlen + else: + return None, None + + @property + def new_action_id(self): + """Get new action id""" + self._action_id += 1 + return self._action_id + + def get_default_flows(self): + """Get default actions/rules + Returns: (<actions>, <rules>) + <actions>: + { <action_id>: [ <list of actions> ]} + Example: + { 0 : [ "accept", "count", {"fwd" : "port": 0} ], ... } + <rules>: + [ {"src_ip": "x.x.x.x", "src_ip_mask", 24, ...}, ... ] + Note: + See `generate_rule_cmds()` to get list of possible map keys. + """ + actions, rules = {}, [] + _port_pairs = PortPairs(self.vnfd_helper.interfaces) + port_pair_list = _port_pairs.port_pair_list + for src_intf, dst_intf in port_pair_list: + # get port numbers of the interfaces + src_port = self.vnfd_helper.port_num(src_intf) + dst_port = self.vnfd_helper.port_num(dst_intf) + # get interface addresses and prefixes + src_net, src_prefix_len = self.get_network_and_prefixlen_from_ip_of_port(src_intf) + dst_net, dst_prefix_len = self.get_network_and_prefixlen_from_ip_of_port(dst_intf) + # ignore entries with empty values + if all((src_net, src_prefix_len, dst_net, dst_prefix_len)): + # flow: src_net:dst_net -> dst_port + action_id = self.new_action_id + actions[action_id] = self.DEFAULT_FWD_ACTIONS[:] + actions[action_id].append({"fwd": {"port": dst_port}}) + rules.append({"priority": 1, 'cmd': self.RULE_CMD, + "src_ip": src_net, "src_ip_mask": src_prefix_len, + "dst_ip": dst_net, "dst_ip_mask": dst_prefix_len, + "src_port_from": 0, "src_port_to": 65535, + "dst_port_from": 0, "dst_port_to": 65535, + "protocol": 0, "protocol_mask": 0, + "action_id": action_id}) + # flow: dst_net:src_net -> src_port + action_id = self.new_action_id + actions[action_id] = self.DEFAULT_FWD_ACTIONS[:] + actions[action_id].append({"fwd": {"port": src_port}}) + rules.append({"cmd":self.RULE_CMD, "priority": 1, + "src_ip": dst_net, "src_ip_mask": dst_prefix_len, + "dst_ip": src_net, "dst_ip_mask": src_prefix_len, + "src_port_from": 0, "src_port_to": 65535, + "dst_port_from": 0, "dst_port_to": 65535, + "protocol": 0, "protocol_mask": 0, + "action_id": action_id}) + return actions, rules + + def get_flows(self, options): + """Get actions/rules based on provided options. + The `options` is a dict representing the ACL rules configuration + file. Result is the same as described in `get_default_flows()`. + """ + actions, rules = {}, [] + for ace in options['access-list-entries']: + # Generate list of actions + action_id = self.new_action_id + actions[action_id] = ace['actions'] + # Destination nestwork + matches = ace['matches'] + dst_ipv4_net = matches['destination-ipv4-network'] + dst_ipv4_net_ip = ipaddress.ip_interface(six.text_type(dst_ipv4_net)) + # Source network + src_ipv4_net = matches['source-ipv4-network'] + src_ipv4_net_ip = ipaddress.ip_interface(six.text_type(src_ipv4_net)) + # Append the rule + rules.append({'action_id': action_id, 'cmd': self.RULE_CMD, + 'dst_ip': dst_ipv4_net_ip.network.network_address.exploded, + 'dst_ip_mask': dst_ipv4_net_ip.network.prefixlen, + 'src_ip': src_ipv4_net_ip.network.network_address.exploded, + 'src_ip_mask': src_ipv4_net_ip.network.prefixlen, + 'dst_port_from': matches['destination-port-range']['lower-port'], + 'dst_port_to': matches['destination-port-range']['upper-port'], + 'src_port_from': matches['source-port-range']['lower-port'], + 'src_port_to': matches['source-port-range']['upper-port'], + 'priority': matches.get('priority', self.DEFAULT_PRIORITY), + 'protocol': matches.get('protocol', self.DEFAULT_PROTOCOL), + 'protocol_mask': matches.get('protocol_mask', + self.DEFAULT_PROTOCOL_MASK) + }) + return actions, rules + + def generate_rule_cmds(self, rules, apply_rules=False): + """Convert rules into list of SampleVNF CLI commands""" + rule_template = ("p {cmd} add {priority} {src_ip} {src_ip_mask} " + "{dst_ip} {dst_ip_mask} {src_port_from} {src_port_to} " + "{dst_port_from} {dst_port_to} {protocol} " + "{protocol_mask} {action_id}") + rule_cmd_list = [] + for rule in rules: + rule_cmd_list.append(rule_template.format(**rule)) + if apply_rules: + # add command to apply all rules at the end + rule_cmd_list.append("p {cmd} applyruleset".format(cmd=self.RULE_CMD)) + return rule_cmd_list + + def generate_action_cmds(self, actions): + """Convert actions into list of SampleVNF CLI commands. + These method doesn't validate the provided list of actions. Supported + list of actions are limited by SampleVNF. Thus, the user should be + responsible to specify correct action name(s). Yardstick should take + the provided action by user and apply it to SampleVNF. + Anyway, some of the actions require addition parameters to be + specified. In case of `fwd` & `nat` action used have to specify + the port attribute. + """ + _action_template_map = { + "fwd": "p action add {action_id} fwd {port}", + "nat": "p action add {action_id} nat {port}" + } + action_cmd_list = [] + for action_id, actions in actions.items(): + for action in actions: + if isinstance(action, dict): + for action_name in action.keys(): + # user provided an action name with addition options + # e.g.: {"fwd": {"port": 0}} + # format action CLI command and add it to the list + if action_name not in _action_template_map.keys(): + raise exceptions.AclUknownActionTemplate( + action_name=action_name) + template = _action_template_map[action_name] + try: + action_cmd_list.append(template.format( + action_id=action_id, **action[action_name])) + except KeyError as exp: + raise exceptions.AclMissingActionArguments( + action_name=action_name, + action_param=exp.args[0]) + else: + # user provided an action name w/o addition options + # e.g.: "accept", "count" + action_cmd_list.append( + "p action add {action_id} {action}".format( + action_id=action_id, action=action)) + return action_cmd_list + + def get_flows_config(self, options=None): + """Get action/rules configuration commands (string) to be + applied to SampleVNF to configure ACL rules (flows). + """ + action_cmd_list, rule_cmd_list = [], [] + if options: + # if file name is set, read actions/rules from the file + actions, rules = self.get_flows(options) + action_cmd_list = self.generate_action_cmds(actions) + rule_cmd_list = self.generate_rule_cmds(rules) + # default actions/rules + dft_actions, dft_rules = self.get_default_flows() + dft_action_cmd_list = self.generate_action_cmds(dft_actions) + dft_rule_cmd_list = self.generate_rule_cmds(dft_rules, apply_rules=True) + # generate multi-line commands to add actions/rules + return '\n'.join(chain(action_cmd_list, dft_action_cmd_list, + rule_cmd_list, dft_rule_cmd_list)) class AclApproxVnf(SampleVNF): @@ -52,17 +246,9 @@ class AclApproxVnf(SampleVNF): 'packets_dropped': 2, } - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = AclApproxSetupEnvSetupEnvHelper - - super(AclApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) - self.acl_rules = None - - def _start_vnf(self): - yang_model_path = utils.find_relative_file( - self.scenario_helper.options['rules'], - self.scenario_helper.task_path) - yang_model = YangModel(yang_model_path) - self.acl_rules = yang_model.get_rules() - super(AclApproxVnf, self)._start_vnf() + super(AclApproxVnf, self).__init__( + name, vnfd, task_id, setup_env_helper_type, resource_helper_type) diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py index 9ceac3167..0fb310075 100644 --- a/yardstick/network_services/vnf_generic/vnf/base.py +++ b/yardstick/network_services/vnf_generic/vnf/base.py @@ -11,13 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -""" Base class implementation for generic vnf implementation """ import abc import logging import six +from yardstick.common import messaging +from yardstick.common.messaging import consumer +from yardstick.common.messaging import payloads +from yardstick.common.messaging import producer from yardstick.network_services.helpers.samplevnf_helper import PortPairs @@ -138,6 +141,70 @@ class VnfdHelper(dict): yield port_name, port_num +class TrafficGeneratorProducer(producer.MessagingProducer): + """Class implementing the message producer for traffic generators + + This message producer must be instantiated in the process created + "run_traffic" process. + """ + def __init__(self, _id): + super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG, + _id=_id) + + def tg_method_started(self, version=1): + """Send a message to inform the traffic generation has started""" + self.send_message( + messaging.TG_METHOD_STARTED, + payloads.TrafficGeneratorPayload(version=version, iteration=0, + kpi={})) + + def tg_method_finished(self, version=1): + """Send a message to inform the traffic generation has finished""" + self.send_message( + messaging.TG_METHOD_FINISHED, + payloads.TrafficGeneratorPayload(version=version, iteration=0, + kpi={})) + + def tg_method_iteration(self, iteration, version=1, kpi=None): + """Send a message, with KPI, once an iteration has finished""" + kpi = {} if kpi is None else kpi + self.send_message( + messaging.TG_METHOD_ITERATION, + payloads.TrafficGeneratorPayload(version=version, + iteration=iteration, kpi=kpi)) + + +@six.add_metaclass(abc.ABCMeta) +class GenericVNFEndpoint(consumer.NotificationHandler): + """Endpoint class for ``GenericVNFConsumer``""" + + @abc.abstractmethod + def runner_method_start_iteration(self, ctxt, **kwargs): + """Endpoint when RUNNER_METHOD_START_ITERATION is received + + :param ctxt: (dict) {'id': <Producer ID>} + :param kwargs: (dict) ``payloads.RunnerPayload`` context + """ + + @abc.abstractmethod + def runner_method_stop_iteration(self, ctxt, **kwargs): + """Endpoint when RUNNER_METHOD_STOP_ITERATION is received + + :param ctxt: (dict) {'id': <Producer ID>} + :param kwargs: (dict) ``payloads.RunnerPayload`` context + """ + + +class GenericVNFConsumer(consumer.MessagingConsumer): + """MQ consumer for ``GenericVNF`` derived classes""" + + def __init__(self, ctx_ids, endpoints): + if not isinstance(endpoints, list): + endpoints = [endpoints] + super(GenericVNFConsumer, self).__init__(messaging.TOPIC_RUNNER, + ctx_ids, endpoints) + + @six.add_metaclass(abc.ABCMeta) class GenericVNF(object): """Class providing file-like API for generic VNF implementation @@ -150,8 +217,9 @@ class GenericVNF(object): UPLINK = PortPairs.UPLINK DOWNLINK = PortPairs.DOWNLINK - def __init__(self, name, vnfd): + def __init__(self, name, vnfd, task_id): self.name = name + self._task_id = task_id self.vnfd_helper = VnfdHelper(vnfd) # List of statistics we can obtain from this VNF # - ETSI MANO 6.3.1.1 monitoring_parameter @@ -210,12 +278,13 @@ class GenericVNF(object): @six.add_metaclass(abc.ABCMeta) class GenericTrafficGen(GenericVNF): - """ Class providing file-like API for generic traffic generator """ + """Class providing file-like API for generic traffic generator""" - def __init__(self, name, vnfd): - super(GenericTrafficGen, self).__init__(name, vnfd) + def __init__(self, name, vnfd, task_id): + super(GenericTrafficGen, self).__init__(name, vnfd, task_id) self.runs_traffic = True self.traffic_finished = False + self._mq_producer = None @abc.abstractmethod def run_traffic(self, traffic_profile): @@ -286,3 +355,16 @@ class GenericTrafficGen(GenericVNF): :return: True/False """ pass + + @staticmethod + def _setup_mq_producer(id): + """Setup the TG MQ producer to send messages between processes + + :return: (derived class from ``MessagingProducer``) MQ producer object + """ + return TrafficGeneratorProducer(id) + + def get_mq_producer_id(self): + """Return the MQ producer ID if initialized""" + if self._mq_producer: + return self._mq_producer.id diff --git a/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py b/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py index bfe628f09..14f1e2e97 100644 --- a/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py @@ -85,12 +85,12 @@ class CgnaptApproxVnf(SampleVNF): "packets_dropped": 4, } - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = CgnaptApproxSetupEnvHelper - - super(CgnaptApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, - resource_helper_type) + super(CgnaptApproxVnf, self).__init__( + name, vnfd, task_id, setup_env_helper_type, resource_helper_type) def _vnf_up_post(self): super(CgnaptApproxVnf, self)._vnf_up_post() diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py index 31ed30140..3241719e8 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -44,6 +44,8 @@ SECTION_CONTENTS = 1 LOG = logging.getLogger(__name__) LOG.setLevel(logging.DEBUG) +LOG_RESULT = logging.getLogger('yardstick') +LOG_RESULT.setLevel(logging.DEBUG) BITS_PER_BYTE = 8 RETRY_SECONDS = 60 @@ -123,7 +125,8 @@ class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')): class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,' 'delta_tx,delta_tsc,' - 'latency,rx_total,tx_total,pps')): + 'latency,rx_total,tx_total,' + 'requested_pps')): @property def pkt_loss(self): try: @@ -132,11 +135,16 @@ class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_ return 100.0 @property - def mpps(self): + def tx_mpps(self): # calculate the effective throughput in Mpps return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6 @property + def rx_mpps(self): + # calculate the effective throughput in Mpps + return float(self.delta_rx) * self.tsc_hz / self.delta_tsc / 1e6 + + @property def can_be_lost(self): return int(self.tx_total * self.tolerated / 1e2) @@ -162,11 +170,12 @@ class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_ ] samples = { - "Throughput": self.mpps, + "Throughput": self.rx_mpps, + "RxThroughput": self.rx_mpps, "DropPackets": pkt_loss, "CurrentDropPackets": pkt_loss, - "TxThroughput": self.pps / 1e6, - "RxThroughput": self.mpps, + "RequestedTxThroughput": self.requested_pps / 1e6, + "TxThroughput": self.tx_mpps, "PktSize": pkt_size, } if port_samples: @@ -177,11 +186,12 @@ class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_ def log_data(self, logger=None): if logger is None: - logger = LOG + logger = LOG_RESULT template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)" - logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost) - logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps) + logger.info(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost) + logger.info("Mpps configured: %f; Mpps generated %f; Mpps received %f", + self.requested_pps / 1e6, self.tx_mpps, self.rx_mpps) class PacketDump(object): @@ -288,7 +298,7 @@ class ProxSocketHelper(object): if mode != 'pktdump': # Regular 1-line message. Stop reading from the socket. LOG.debug("Regular response read") - return ret_str + return ret_str, True LOG.debug("Packet dump header read: [%s]", ret_str) @@ -309,13 +319,13 @@ class ProxSocketHelper(object): # Return boolean instead of string to signal # successful reception of the packet dump. LOG.debug("Packet dump stored, returning") - return True + return True, False index = data_end + 1 - return ret_str + return ret_str, False - def get_data(self, pkt_dump_only=False, timeout=1): + def get_data(self, pkt_dump_only=False, timeout=0.01): """ read data from the socket """ # This method behaves slightly differently depending on whether it is @@ -352,7 +362,9 @@ class ProxSocketHelper(object): ret_str = "" for status in iter(is_ready, False): decoded_data = self._sock.recv(256).decode('utf-8') - ret_str = self._parse_socket_data(decoded_data, pkt_dump_only) + ret_str, done = self._parse_socket_data(decoded_data, pkt_dump_only) + if (done): + break LOG.debug("Received data from socket: [%s]", ret_str) return ret_str if status else '' @@ -386,8 +398,14 @@ class ProxSocketHelper(object): def stop(self, cores, task=''): """ stop specific cores on the remote instance """ - LOG.debug("Stopping cores %s", cores) - self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task)) + + tmpcores = [] + for core in cores: + if core not in tmpcores: + tmpcores.append(core) + + LOG.debug("Stopping cores %s", tmpcores) + self.put_command("stop {} {}\n".format(join_non_strings(',', tmpcores), task)) time.sleep(3) def start_all(self): @@ -397,8 +415,14 @@ class ProxSocketHelper(object): def start(self, cores): """ start specific cores on the remote instance """ - LOG.debug("Starting cores %s", cores) - self.put_command("start {}\n".format(join_non_strings(',', cores))) + + tmpcores = [] + for core in cores: + if core not in tmpcores: + tmpcores.append(core) + + LOG.debug("Starting cores %s", tmpcores) + self.put_command("start {}\n".format(join_non_strings(',', tmpcores))) time.sleep(3) def reset_stats(self): @@ -520,6 +544,51 @@ class ProxSocketHelper(object): tsc = int(ret[3]) return rx, tx, drop, tsc + def multi_port_stats(self, ports): + """get counter values from all ports port""" + + ports_str = "" + for port in ports: + ports_str = ports_str + str(port) + "," + ports_str = ports_str[:-1] + + ports_all_data = [] + tot_result = [0] * len(ports) + + retry_counter = 0 + port_index = 0 + while (len(ports) is not len(ports_all_data)) and (retry_counter < 10): + self.put_command("multi port stats {}\n".format(ports_str)) + ports_all_data = self.get_data().split(";") + + if len(ports) is len(ports_all_data): + for port_data_str in ports_all_data: + + try: + tot_result[port_index] = [try_int(s, 0) for s in port_data_str.split(",")] + except (IndexError, TypeError): + LOG.error("Port Index error %d %s - retrying ", port_index, port_data_str) + + if (len(tot_result[port_index]) is not 6) or \ + tot_result[port_index][0] is not ports[port_index]: + ports_all_data = [] + tot_result = [0] * len(ports) + port_index = 0 + time.sleep(0.1) + LOG.error("Corrupted PACKET %s - retrying", port_data_str) + break + else: + port_index = port_index + 1 + else: + LOG.error("Empty / too much data - retry -%s-", ports_all_data) + ports_all_data = [] + tot_result = [0] * len(ports) + port_index = 0 + time.sleep(0.1) + + retry_counter = retry_counter + 1 + return tot_result + def port_stats(self, ports): """get counter values from a specific port""" tot_result = [0] * 12 @@ -900,7 +969,7 @@ class ProxResourceHelper(ClientResourceHelper): self._test_type = self.setup_helper.find_in_section('global', 'name', None) return self._test_type - def run_traffic(self, traffic_profile): + def run_traffic(self, traffic_profile, *args): self._queue.cancel_join_thread() self.lower = 0.0 self.upper = 100.0 @@ -1000,9 +1069,13 @@ class ProxDataHelper(object): @property def totals_and_pps(self): if self._totals_and_pps is None: - rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8] - pps = self.value / 100.0 * self.line_rate_to_pps() - self._totals_and_pps = rx_total, tx_total, pps + rx_total = tx_total = 0 + all_ports = self.sut.multi_port_stats(range(self.port_count)) + for port in all_ports: + rx_total = rx_total + port[1] + tx_total = tx_total + port[2] + requested_pps = self.value / 100.0 * self.line_rate_to_pps() + self._totals_and_pps = rx_total, tx_total, requested_pps return self._totals_and_pps @property @@ -1014,25 +1087,24 @@ class ProxDataHelper(object): return self.totals_and_pps[1] @property - def pps(self): + def requested_pps(self): return self.totals_and_pps[2] @property def samples(self): samples = {} + ports = [] + port_names = [] for port_name, port_num in self.vnfd_helper.ports_iter(): - try: - port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8] - samples[port_name] = { - "in_packets": port_rx_total, - "out_packets": port_tx_total, - } - except (KeyError, TypeError, NameError, MemoryError, ValueError, - SystemError, BufferError): - samples[port_name] = { - "in_packets": 0, - "out_packets": 0, - } + ports.append(port_num) + port_names.append(port_name) + + results = self.sut.multi_port_stats(ports) + for result in results: + port_num = result[0] + samples[port_names[port_num]] = { + "in_packets": result[1], + "out_packets": result[2]} return samples def __enter__(self): @@ -1055,7 +1127,7 @@ class ProxDataHelper(object): self.latency, self.rx_total, self.tx_total, - self.pps, + self.requested_pps, ) self.result_tuple.log_data() @@ -1134,6 +1206,7 @@ class ProxProfileHelper(object): self.sut.set_pkt_size(self.test_cores, pkt_size) self.sut.set_speed(self.test_cores, value) self.sut.start_all() + time.sleep(1) yield finally: self.sut.stop_all() @@ -1153,12 +1226,32 @@ class ProxProfileHelper(object): return cores + def pct_10gbps(self, percent, line_speed): + """Get rate in percent of 10 Gbps. + + Returns the rate in percent of 10 Gbps. + For instance 100.0 = 10 Gbps; 400.0 = 40 Gbps. + + This helper method isrequired when setting interface_speed option in + the testcase because NSB/PROX considers 10Gbps as 100% of line rate, + this means that the line rate must be expressed as a percentage of + 10Gbps. + + :param percent: (float) Percent of line rate (100.0 = line rate). + :param line_speed: (int) line rate speed, in bits per second. + + :return: (float) Represents the rate in percent of 10Gbps. + """ + return (percent * line_speed / ( + constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)) + def run_test(self, pkt_size, duration, value, tolerated_loss=0.0, line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)): data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss, line_speed) - with data_helper, self.traffic_context(pkt_size, value): + with data_helper, self.traffic_context(pkt_size, + self.pct_10gbps(value, line_speed)): with data_helper.measure_tot_stats(): time.sleep(duration) # Getting statistics to calculate PPS at right speed.... @@ -1246,6 +1339,7 @@ class ProxMplsProfileHelper(ProxProfileHelper): ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20) self.sut.set_speed(self.plain_cores, value * ratio) self.sut.start_all() + time.sleep(1) yield finally: self.sut.stop_all() @@ -1417,7 +1511,8 @@ class ProxBngProfileHelper(ProxProfileHelper): data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss, line_speed) - with data_helper, self.traffic_context(pkt_size, value): + with data_helper, self.traffic_context(pkt_size, + self.pct_10gbps(value, line_speed)): with data_helper.measure_tot_stats(): time.sleep(duration) # Getting statistics to calculate PPS at right speed.... @@ -1606,7 +1701,8 @@ class ProxVpeProfileHelper(ProxProfileHelper): data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss, line_speed) - with data_helper, self.traffic_context(pkt_size, value): + with data_helper, self.traffic_context(pkt_size, + self.pct_10gbps(value, line_speed)): with data_helper.measure_tot_stats(): time.sleep(duration) # Getting statistics to calculate PPS at right speed.... @@ -1797,7 +1893,8 @@ class ProxlwAFTRProfileHelper(ProxProfileHelper): data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss, line_speed) - with data_helper, self.traffic_context(pkt_size, value): + with data_helper, self.traffic_context(pkt_size, + self.pct_10gbps(value, line_speed)): with data_helper.measure_tot_stats(): time.sleep(duration) # Getting statistics to calculate PPS at right speed.... diff --git a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py index 285e08659..839f30967 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py @@ -15,14 +15,13 @@ import errno import logging import datetime -import time - from yardstick.common.process import check_if_process_failed from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxDpdkVnfSetupEnvHelper from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxResourceHelper from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF from yardstick.network_services import constants +from yardstick.benchmark.contexts import base as context_base LOG = logging.getLogger(__name__) @@ -35,7 +34,8 @@ class ProxApproxVnf(SampleVNF): VNF_PROMPT = "PROX started" LUA_PARAMETER_NAME = "sut" - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = ProxDpdkVnfSetupEnvHelper @@ -44,9 +44,10 @@ class ProxApproxVnf(SampleVNF): self.prev_packets_in = 0 self.prev_packets_sent = 0 - self.prev_time = time.time() - super(ProxApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, - resource_helper_type) + self.prev_tsc = 0 + self.tsc_hz = 0 + super(ProxApproxVnf, self).__init__( + name, vnfd, task_id, setup_env_helper_type, resource_helper_type) def _vnf_up_post(self): self.resource_helper.up_post() @@ -68,50 +69,75 @@ class ProxApproxVnf(SampleVNF): def collect_kpi(self): # we can't get KPIs if the VNF is down - check_if_process_failed(self._vnf_process) + check_if_process_failed(self._vnf_process, 0.01) + + physical_node = context_base.Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + + result = {"physical_node": physical_node} if self.resource_helper is None: - result = { + result.update({ "packets_in": 0, "packets_dropped": 0, "packets_fwd": 0, "collect_stats": {"core": {}}, - } + }) return result + if (self.tsc_hz == 0): + self.tsc_hz = float(self.resource_helper.sut.hz()) + LOG.debug("TSC = %f", self.tsc_hz) + if (self.tsc_hz == 0): + raise RuntimeError("Unable to retrieve TSC") + # use all_ports so we only use ports matched in topology port_count = len(self.vnfd_helper.port_pairs.all_ports) if port_count not in {1, 2, 4}: raise RuntimeError("Failed ..Invalid no of ports .. " "1, 2 or 4 ports only supported at this time") - self.port_stats = self.vnf_execute('port_stats', range(port_count)) - curr_time = time.time() + all_port_stats = self.vnf_execute('multi_port_stats', range(port_count)) + rx_total = tx_total = tsc = 0 try: - rx_total = self.port_stats[6] - tx_total = self.port_stats[7] - except IndexError: - LOG.debug("port_stats parse fail ") - # return empty dict so we don't mess up existing KPIs + for single_port_stats in all_port_stats: + rx_total = rx_total + single_port_stats[1] + tx_total = tx_total + single_port_stats[2] + tsc = tsc + single_port_stats[5] + except (TypeError, IndexError): + LOG.error("Invalid data ...") return {} - result = { + tsc = tsc / port_count + + result.update({ "packets_in": rx_total, "packets_dropped": max((tx_total - rx_total), 0), "packets_fwd": tx_total, # we share ProxResourceHelper with TG, but we want to collect # collectd KPIs here and not TG KPIs, so use a different method name "collect_stats": self.resource_helper.collect_collectd_kpi(), - } - curr_packets_in = int((rx_total - self.prev_packets_in) / (curr_time - self.prev_time)) - curr_packets_fwd = int((tx_total - self.prev_packets_sent) / (curr_time - self.prev_time)) + }) + try: + curr_packets_in = int(((rx_total - self.prev_packets_in) * self.tsc_hz) + / (tsc - self.prev_tsc)) + except ZeroDivisionError: + LOG.error("Error.... Divide by Zero") + curr_packets_in = 0 + + try: + curr_packets_fwd = int(((tx_total - self.prev_packets_sent) * self.tsc_hz) + / (tsc - self.prev_tsc)) + except ZeroDivisionError: + LOG.error("Error.... Divide by Zero") + curr_packets_fwd = 0 result["curr_packets_in"] = curr_packets_in result["curr_packets_fwd"] = curr_packets_fwd self.prev_packets_in = rx_total self.prev_packets_sent = tx_total - self.prev_time = curr_time + self.prev_tsc = tsc LOG.debug("%s collect KPIs %s %s", self.APP_NAME, datetime.datetime.now(), result) return result diff --git a/yardstick/network_services/vnf_generic/vnf/router_vnf.py b/yardstick/network_services/vnf_generic/vnf/router_vnf.py index aea27ffa6..e99de9cb3 100644 --- a/yardstick/network_services/vnf_generic/vnf/router_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/router_vnf.py @@ -34,7 +34,8 @@ class RouterVNF(SampleVNF): WAIT_TIME = 1 - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = DpdkVnfSetupEnvHelper @@ -42,12 +43,12 @@ class RouterVNF(SampleVNF): vnfd['mgmt-interface'].pop("pkey", "") vnfd['mgmt-interface']['password'] = 'password' - super(RouterVNF, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) + super(RouterVNF, self).__init__( + name, vnfd, task_id, setup_env_helper_type, resource_helper_type) def instantiate(self, scenario_cfg, context_cfg): self.scenario_helper.scenario_cfg = scenario_cfg self.context_cfg = context_cfg - self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name]) self.configure_routes(self.name, scenario_cfg, context_cfg) def wait_for_instantiate(self): @@ -107,8 +108,11 @@ class RouterVNF(SampleVNF): stdout = self.ssh_helper.execute(ip_link_stats)[1] link_stats = self.get_stats(stdout) # get RX/TX from link_stats and assign to results + physical_node = Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) result = { + "physical_node": physical_node, "packets_in": 0, "packets_dropped": 0, "packets_fwd": 0, diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 16873611e..3ef7c33c5 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -11,15 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -""" Base class implementation for generic vnf implementation """ -from collections import Mapping import logging from multiprocessing import Queue, Value, Process - import os import posixpath import re +import uuid import subprocess import time @@ -32,6 +30,7 @@ from yardstick.benchmark.contexts.base import Context from yardstick.common import exceptions as y_exceptions from yardstick.common.process import check_if_process_failed from yardstick.common import utils +from yardstick.common import yaml_loader from yardstick.network_services import constants from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig @@ -41,7 +40,7 @@ from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen from yardstick.network_services.vnf_generic.vnf.base import GenericVNF from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper - +from yardstick.benchmark.contexts.node import NodeContext LOG = logging.getLogger(__name__) @@ -144,6 +143,13 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): 'vnf_type': self.VNF_TYPE, } + # read actions/rules from file + acl_options = None + acl_file_name = self.scenario_helper.options.get('rules') + if acl_file_name: + with utils.open_relative_file(acl_file_name, task_path) as infile: + acl_options = yaml_loader.yaml_load(infile) + config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path) config_basename = posixpath.basename(self.CFG_CONFIG) @@ -176,12 +182,17 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): new_config = self._update_packet_type(new_config, traffic_options) self.ssh_helper.upload_config_file(config_basename, new_config) self.ssh_helper.upload_config_file(script_basename, - multiport.generate_script(self.vnfd_helper)) + multiport.generate_script(self.vnfd_helper, + self.get_flows_config(acl_options))) LOG.info("Provision and start the %s", self.APP_NAME) self._build_pipeline_kwargs() return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs) + def get_flows_config(self, options=None): # pylint: disable=unused-argument + """No actions/rules (flows) by default""" + return None + def _build_pipeline_kwargs(self): tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME) # count the number of actual ports in the list of pairs @@ -309,6 +320,7 @@ class ResourceHelper(object): self.resource = None self.setup_helper = setup_helper self.ssh_helper = setup_helper.ssh_helper + self._enable = True def setup(self): self.resource = self.setup_helper.setup_vnf_environment() @@ -316,22 +328,33 @@ class ResourceHelper(object): def generate_cfg(self): pass + def update_from_context(self, context, attr_name): + """Disable resource helper in case of baremetal context. + + And update appropriate node collectd options in context + """ + if isinstance(context, NodeContext): + self._enable = False + context.update_collectd_options_for_node(self.setup_helper.collectd_options, + attr_name) + def _collect_resource_kpi(self): result = {} status = self.resource.check_if_system_agent_running("collectd")[0] - if status == 0: + if status == 0 and self._enable: result = self.resource.amqp_collect_nfvi_kpi() result = {"core": result} return result def start_collect(self): - self.resource.initiate_systemagent(self.ssh_helper.bin_path) - self.resource.start() - self.resource.amqp_process_for_nfvi_kpi() + if self._enable: + self.resource.initiate_systemagent(self.ssh_helper.bin_path) + self.resource.start() + self.resource.amqp_process_for_nfvi_kpi() def stop_collect(self): - if self.resource: + if self.resource and self._enable: self.resource.stop() def collect_kpi(self): @@ -375,48 +398,24 @@ class ClientResourceHelper(ResourceHelper): LOG.error('TRex client not connected') return {} - def generate_samples(self, ports, key=None, default=None): - # needs to be used ports - last_result = self.get_stats(ports) - key_value = last_result.get(key, default) - - if not isinstance(last_result, Mapping): # added for mock unit test - self._terminated.value = 1 - return {} - - samples = {} - # recalculate port for interface and see if it matches ports provided - for intf in self.vnfd_helper.interfaces: - name = intf["name"] - port = self.vnfd_helper.port_num(name) - if port in ports: - xe_value = last_result.get(port, {}) - samples[name] = { - "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)), - "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)), - "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)), - "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)), - "in_packets": int(xe_value.get("ipackets", 0)), - "out_packets": int(xe_value.get("opackets", 0)), - } - if key: - samples[name][key] = key_value - return samples + def _get_samples(self, ports, port_pg_id=False): + raise NotImplementedError() def _run_traffic_once(self, traffic_profile): traffic_profile.execute_traffic(self) self.client_started.value = 1 time.sleep(self.RUN_DURATION) - samples = self.generate_samples(traffic_profile.ports) + samples = self._get_samples(traffic_profile.ports) time.sleep(self.QUEUE_WAIT_TIME) self._queue.put(samples) - def run_traffic(self, traffic_profile): + def run_traffic(self, traffic_profile, mq_producer): # if we don't do this we can hang waiting for the queue to drain # have to do this in the subprocess self._queue.cancel_join_thread() # fixme: fix passing correct trex config file, # instead of searching the default path + mq_producer.tg_method_started() try: self._build_ports() self.client = self._connect() @@ -424,8 +423,11 @@ class ClientResourceHelper(ResourceHelper): self.client.remove_all_streams(self.all_ports) # remove all streams traffic_profile.register_generator(self) + iteration_index = 0 while self._terminated.value == 0: + iteration_index += 1 self._run_traffic_once(traffic_profile) + mq_producer.tg_method_iteration(iteration_index) self.client.stop(self.all_ports) self.client.disconnect() @@ -436,6 +438,8 @@ class ClientResourceHelper(ResourceHelper): return # return if trex/tg server is stopped. raise + mq_producer.tg_method_finished() + def terminate(self): self._terminated.value = 1 # stop client @@ -615,6 +619,7 @@ class ScenarioHelper(object): test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT) return test_duration if test_duration > test_timeout else test_timeout + class SampleVNF(GenericVNF): """ Class providing file-like API for generic VNF implementation """ @@ -624,8 +629,9 @@ class SampleVNF(GenericVNF): APP_NAME = "SampleVNF" # we run the VNF interactively, so the ssh command will timeout after this long - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): - super(SampleVNF, self).__init__(name, vnfd) + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): + super(SampleVNF, self).__init__(name, vnfd, task_id) self.bin_path = get_nsb_option('bin_path', '') self.scenario_helper = ScenarioHelper(self.name) @@ -646,7 +652,6 @@ class SampleVNF(GenericVNF): self.resource_helper = resource_helper_type(self.setup_helper) self.context_cfg = None - self.nfvi_context = None self.pipeline_kwargs = {} self.uplink_ports = None self.downlink_ports = None @@ -673,8 +678,10 @@ class SampleVNF(GenericVNF): self._update_collectd_options(scenario_cfg, context_cfg) self.scenario_helper.scenario_cfg = scenario_cfg self.context_cfg = context_cfg - self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name]) - # self.nfvi_context = None + self.resource_helper.update_from_context( + Context.get_context_from_server(self.scenario_helper.nodes[self.name]), + self.scenario_helper.nodes[self.name] + ) # vnf deploy is unsupported, use ansible playbooks if self.scenario_helper.options.get("vnf_deploy", False): @@ -825,18 +832,21 @@ class SampleVNF(GenericVNF): def collect_kpi(self): # we can't get KPIs if the VNF is down - check_if_process_failed(self._vnf_process) + check_if_process_failed(self._vnf_process, 0.01) stats = self.get_stats() m = re.search(self.COLLECT_KPI, stats, re.MULTILINE) + physical_node = Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + + result = {"physical_node": physical_node} if m: - result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()} + result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}) result["collect_stats"] = self.resource_helper.collect_kpi() else: - result = { - "packets_in": 0, - "packets_fwd": 0, - "packets_dropped": 0, - } + result.update({"packets_in": 0, + "packets_fwd": 0, + "packets_dropped": 0}) + LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result @@ -852,8 +862,9 @@ class SampleVNFTrafficGen(GenericTrafficGen): APP_NAME = 'Sample' RUN_WAIT = 1 - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): - super(SampleVNFTrafficGen, self).__init__(name, vnfd) + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): + super(SampleVNFTrafficGen, self).__init__(name, vnfd, task_id) self.bin_path = get_nsb_option('bin_path', '') self.scenario_helper = ScenarioHelper(self.name) @@ -882,6 +893,11 @@ class SampleVNFTrafficGen(GenericTrafficGen): def instantiate(self, scenario_cfg, context_cfg): self.scenario_helper.scenario_cfg = scenario_cfg + self.resource_helper.update_from_context( + Context.get_context_from_server(self.scenario_helper.nodes[self.name]), + self.scenario_helper.nodes[self.name] + ) + self.resource_helper.setup() # must generate_cfg after DPDK bind because we need port number self.resource_helper.generate_cfg() @@ -905,12 +921,13 @@ class SampleVNFTrafficGen(GenericTrafficGen): LOG.info("%s TG Server is up and running.", self.APP_NAME) return self._tg_process.exitcode - def _traffic_runner(self, traffic_profile): + def _traffic_runner(self, traffic_profile, mq_id): # always drop connections first thing in new processes # so we don't get paramiko errors self.ssh_helper.drop_connection() LOG.info("Starting %s client...", self.APP_NAME) - self.resource_helper.run_traffic(traffic_profile) + self._mq_producer = self._setup_mq_producer(mq_id) + self.resource_helper.run_traffic(traffic_profile, self._mq_producer) def run_traffic(self, traffic_profile): """ Generate traffic on the wire according to the given params. @@ -920,10 +937,12 @@ class SampleVNFTrafficGen(GenericTrafficGen): :param traffic_profile: :return: True/False """ - name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__, + name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME, + traffic_profile.__class__.__name__, os.getpid()) - self._traffic_process = Process(name=name, target=self._traffic_runner, - args=(traffic_profile,)) + self._traffic_process = Process( + name=name, target=self._traffic_runner, + args=(traffic_profile, uuid.uuid1().int)) self._traffic_process.start() # Wait for traffic process to start while self.resource_helper.client_started.value == 0: @@ -932,13 +951,16 @@ class SampleVNFTrafficGen(GenericTrafficGen): if not self._traffic_process.is_alive(): break - return self._traffic_process.is_alive() - def collect_kpi(self): # check if the tg processes have exited + physical_node = Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + + result = {"physical_node": physical_node} for proc in (self._tg_process, self._traffic_process): check_if_process_failed(proc) - result = self.resource_helper.collect_kpi() + + result["collect_stats"] = self.resource_helper.collect_kpi() LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ixload.py b/yardstick/network_services/vnf_generic/vnf/tg_ixload.py index 02e7803f7..e0fc47dbf 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_ixload.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_ixload.py @@ -12,15 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import absolute_import +import collections import csv import glob import logging import os import shutil - -from collections import OrderedDict -from subprocess import call +import subprocess from yardstick.common import utils from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen @@ -65,7 +63,7 @@ class IxLoadResourceHelper(ClientResourceHelper): RESULTS_MOUNT = "/mnt/Results" - KPI_LIST = OrderedDict(( + KPI_LIST = collections.OrderedDict(( ('http_throughput', 'HTTP Total Throughput (Kbps)'), ('simulated_users', 'HTTP Simulated Users'), ('concurrent_connections', 'HTTP Concurrent Connections'), @@ -75,7 +73,8 @@ class IxLoadResourceHelper(ClientResourceHelper): def __init__(self, setup_helper): super(IxLoadResourceHelper, self).__init__(setup_helper) - self.result = OrderedDict((key, ResourceDataHelper()) for key in self.KPI_LIST) + self.result = collections.OrderedDict((key, ResourceDataHelper()) + for key in self.KPI_LIST) self.resource_file_name = '' self.data = None @@ -101,7 +100,7 @@ class IxLoadResourceHelper(ClientResourceHelper): LOG.debug(cmd) if not os.path.ismount(self.RESULTS_MOUNT): - call(cmd, shell=True) + subprocess.call(cmd, shell=True) shutil.rmtree(self.RESULTS_MOUNT, ignore_errors=True) utils.makedirs(self.RESULTS_MOUNT) @@ -124,12 +123,13 @@ class IxLoadResourceHelper(ClientResourceHelper): class IxLoadTrafficGen(SampleVNFTrafficGen): - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): if resource_helper_type is None: resource_helper_type = IxLoadResourceHelper - super(IxLoadTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, - resource_helper_type) + super(IxLoadTrafficGen, self).__init__( + name, vnfd, task_id, setup_env_helper_type, resource_helper_type) self._result = {} def run_traffic(self, traffic_profile): @@ -157,7 +157,7 @@ class IxLoadTrafficGen(SampleVNFTrafficGen): args="'%s'" % ixload_config) LOG.debug(cmd) - call(cmd, shell=True) + subprocess.call(cmd, shell=True) with open(self.ssh_helper.join_bin_path("ixLoad_HTTP_Client.csv")) as csv_file: lines = csv_file.readlines()[10:] @@ -172,5 +172,5 @@ class IxLoadTrafficGen(SampleVNFTrafficGen): self.resource_helper.data = self.resource_helper.make_aggregates() def terminate(self): - call(["pkill", "-9", "http_ixload.py"]) + subprocess.call(["pkill", "-9", "http_ixload.py"]) super(IxLoadTrafficGen, self).terminate() diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ping.py b/yardstick/network_services/vnf_generic/vnf/tg_ping.py index a989543f5..a3b5afa39 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_ping.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_ping.py @@ -71,7 +71,7 @@ class PingResourceHelper(ClientResourceHelper): self._queue = Queue() self._parser = PingParser(self._queue) - def run_traffic(self, traffic_profile): + def run_traffic(self, traffic_profile, *args): # drop the connection in order to force a new one self.ssh_helper.drop_connection() @@ -103,14 +103,14 @@ class PingTrafficGen(SampleVNFTrafficGen): APP_NAME = 'Ping' RUN_WAIT = 4 - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = PingSetupEnvHelper if resource_helper_type is None: resource_helper_type = PingResourceHelper - - super(PingTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, - resource_helper_type) + super(PingTrafficGen, self).__init__( + name, vnfd, task_id, setup_env_helper_type, resource_helper_type) self._result = {} def _check_status(self): diff --git a/yardstick/network_services/vnf_generic/vnf/tg_prox.py b/yardstick/network_services/vnf_generic/vnf/tg_prox.py index 282dd92c5..854319a21 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_prox.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_prox.py @@ -30,9 +30,11 @@ class ProxTrafficGen(SampleVNFTrafficGen): LUA_PARAMETER_NAME = "gen" WAIT_TIME = 1 - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): # don't call superclass, use custom wrapper of ProxApproxVnf - self._vnf_wrapper = ProxApproxVnf(name, vnfd, setup_env_helper_type, resource_helper_type) + self._vnf_wrapper = ProxApproxVnf( + name, vnfd, task_id, setup_env_helper_type, resource_helper_type) self.bin_path = get_nsb_option('bin_path', '') self.name = self._vnf_wrapper.name self.ssh_helper = self._vnf_wrapper.ssh_helper diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py index 265d0b7a9..4d3bc2ce5 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py @@ -12,31 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import absolute_import - -import time -import os import logging -import sys from yardstick.common import utils -from yardstick import error +from yardstick.network_services.libs.ixia_libs.ixnet import ixnet_api from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper + LOG = logging.getLogger(__name__) WAIT_AFTER_CFG_LOAD = 10 WAIT_FOR_TRAFFIC = 30 -IXIA_LIB = os.path.dirname(os.path.realpath(__file__)) -IXNET_LIB = os.path.join(IXIA_LIB, "../../libs/ixia_libs/IxNet") -sys.path.append(IXNET_LIB) - -try: - from IxNet import IxNextgen -except ImportError: - IxNextgen = error.ErrorClass class IxiaRfc2544Helper(Rfc2544ResourceHelper): @@ -53,7 +41,7 @@ class IxiaResourceHelper(ClientResourceHelper): super(IxiaResourceHelper, self).__init__(setup_helper) self.scenario_helper = setup_helper.scenario_helper - self.client = IxNextgen() + self.client = ixnet_api.IxNextgen() if rfc_helper_type is None: rfc_helper_type = IxiaRfc2544Helper @@ -64,20 +52,16 @@ class IxiaResourceHelper(ClientResourceHelper): self._connect() def _connect(self, client=None): - self.client._connect(self.vnfd_helper) + self.client.connect(self.vnfd_helper) def get_stats(self, *args, **kwargs): - return self.client.ix_get_statistics() + return self.client.get_statistics() def stop_collect(self): self._terminated.value = 1 - if self.client: - self.client.ix_stop_traffic() - def generate_samples(self, ports, key=None, default=None): + def generate_samples(self, ports, key=None): stats = self.get_stats() - last_result = stats[1] - latency = stats[0] samples = {} # this is not DPDK port num, but this is whatever number we gave @@ -88,19 +72,21 @@ class IxiaResourceHelper(ClientResourceHelper): intf = self.vnfd_helper.find_interface_by_port(port_num) port_name = intf["name"] samples[port_name] = { - "rx_throughput_kps": float(last_result["Rx_Rate_Kbps"][port_num]), - "tx_throughput_kps": float(last_result["Tx_Rate_Kbps"][port_num]), - "rx_throughput_mbps": float(last_result["Rx_Rate_Mbps"][port_num]), - "tx_throughput_mbps": float(last_result["Tx_Rate_Mbps"][port_num]), - "in_packets": int(last_result["Valid_Frames_Rx"][port_num]), - "out_packets": int(last_result["Frames_Tx"][port_num]), - "RxThroughput": int(last_result["Valid_Frames_Rx"][port_num]) / 30, - "TxThroughput": int(last_result["Frames_Tx"][port_num]) / 30, + "rx_throughput_kps": float(stats["Rx_Rate_Kbps"][port_num]), + "tx_throughput_kps": float(stats["Tx_Rate_Kbps"][port_num]), + "rx_throughput_mbps": float(stats["Rx_Rate_Mbps"][port_num]), + "tx_throughput_mbps": float(stats["Tx_Rate_Mbps"][port_num]), + "in_packets": int(stats["Valid_Frames_Rx"][port_num]), + "out_packets": int(stats["Frames_Tx"][port_num]), + # NOTE(ralonsoh): we need to make the traffic injection + # time variable. + "RxThroughput": int(stats["Valid_Frames_Rx"][port_num]) / 30, + "TxThroughput": int(stats["Frames_Tx"][port_num]) / 30, } if key: - avg_latency = latency["Store-Forward_Avg_latency_ns"][port_num] - min_latency = latency["Store-Forward_Min_latency_ns"][port_num] - max_latency = latency["Store-Forward_Max_latency_ns"][port_num] + avg_latency = stats["Store-Forward_Avg_latency_ns"][port_num] + min_latency = stats["Store-Forward_Min_latency_ns"][port_num] + max_latency = stats["Store-Forward_Max_latency_ns"][port_num] samples[port_name][key] = \ {"Store-Forward_Avg_latency_ns": avg_latency, "Store-Forward_Min_latency_ns": min_latency, @@ -110,7 +96,13 @@ class IxiaResourceHelper(ClientResourceHelper): return samples - def run_traffic(self, traffic_profile): + def _initialize_client(self): + """Initialize the IXIA IxNetwork client and configure the server""" + self.client.clear_config() + self.client.assign_ports() + self.client.create_traffic_model() + + def run_traffic(self, traffic_profile, *args): if self._terminated.value: return @@ -119,16 +111,7 @@ class IxiaResourceHelper(ClientResourceHelper): default = "00:00:00:00:00:00" self._build_ports() - - # we don't know client_file_name until runtime as instantiate - client_file_name = \ - utils.find_relative_file( - self.scenario_helper.scenario_cfg['ixia_profile'], - self.scenario_helper.scenario_cfg["task_path"]) - self.client.ix_load_config(client_file_name) - time.sleep(WAIT_AFTER_CFG_LOAD) - - self.client.ix_assign_ports() + self._initialize_client() mac = {} for port_name in self.vnfd_helper.port_pairs.all_ports: @@ -140,43 +123,28 @@ class IxiaResourceHelper(ClientResourceHelper): mac["src_mac_{}".format(port_num)] = virt_intf.get("local_mac", default) mac["dst_mac_{}".format(port_num)] = virt_intf.get("dst_mac", default) - samples = {} - # Generate ixia traffic config... try: while not self._terminated.value: - traffic_profile.execute_traffic(self, self.client, mac) + first_run = traffic_profile.execute_traffic( + self, self.client, mac) self.client_started.value = 1 - time.sleep(WAIT_FOR_TRAFFIC) - self.client.ix_stop_traffic() + # pylint: disable=unnecessary-lambda + utils.wait_until_true(lambda: self.client.is_traffic_stopped()) samples = self.generate_samples(traffic_profile.ports) + + # NOTE(ralonsoh): the traffic injection duration is fixed to 30 + # seconds. This parameter is configurable and must be retrieved + # from the traffic_profile.full_profile information. + # Every flow must have the same duration. + completed, samples = traffic_profile.get_drop_percentage( + samples, min_tol, max_tol, first_run=first_run) self._queue.put(samples) - status, samples = traffic_profile.get_drop_percentage(samples, min_tol, - max_tol, self.client, mac) - current = samples['CurrentDropPercentage'] - if min_tol <= current <= max_tol or status == 'Completed': + if completed: self._terminated.value = 1 - self.client.ix_stop_traffic() - self._queue.put(samples) - - if not self.rfc_helper.is_done(): - self._terminated.value = 1 - return - - traffic_profile.execute_traffic(self, self.client, mac) - for _ in range(5): - time.sleep(self.LATENCY_TIME_SLEEP) - self.client.ix_stop_traffic() - samples = self.generate_samples(traffic_profile.ports, 'latency', {}) - self._queue.put(samples) - traffic_profile.start_ixia_latency(self, self.client, mac) - if self._terminated.value: - break - - self.client.ix_stop_traffic() except Exception: # pylint: disable=broad-except - LOG.exception("Run Traffic terminated") + LOG.exception('Run Traffic terminated') self._terminated.value = 1 @@ -189,12 +157,12 @@ class IxiaTrafficGen(SampleVNFTrafficGen): APP_NAME = 'Ixia' - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): if resource_helper_type is None: resource_helper_type = IxiaResourceHelper - - super(IxiaTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, - resource_helper_type) + super(IxiaTrafficGen, self).__init__( + name, vnfd, task_id, setup_env_helper_type, resource_helper_type) self._ixia_traffic_gen = None self.ixia_file_name = '' self.vnf_port_pairs = [] diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py index 4e9f4bdc1..cdbb41485 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py @@ -11,74 +11,45 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -""" Trex traffic generation definitions which implements rfc2544 """ -from __future__ import absolute_import -from __future__ import print_function -import time import logging -from collections import Mapping - -from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexTrafficGen -from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper -from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexResourceHelper - -LOGGING = logging.getLogger(__name__) +import time +from yardstick.common import utils +from yardstick.network_services.vnf_generic.vnf import sample_vnf +from yardstick.network_services.vnf_generic.vnf import tg_trex -class TrexRfc2544ResourceHelper(Rfc2544ResourceHelper): - def is_done(self): - return self.latency and self.iteration.value > 10 +LOGGING = logging.getLogger(__name__) -class TrexRfcResourceHelper(TrexResourceHelper): +class TrexRfcResourceHelper(tg_trex.TrexResourceHelper): - LATENCY_TIME_SLEEP = 120 - RUN_DURATION = 30 - WAIT_TIME = 3 + SAMPLING_PERIOD = 2 + TRANSIENT_PERIOD = 10 - def __init__(self, setup_helper, rfc_helper_type=None): + def __init__(self, setup_helper): super(TrexRfcResourceHelper, self).__init__(setup_helper) - - if rfc_helper_type is None: - rfc_helper_type = TrexRfc2544ResourceHelper - - self.rfc2544_helper = rfc_helper_type(self.scenario_helper) + self.rfc2544_helper = sample_vnf.Rfc2544ResourceHelper( + self.scenario_helper) def _run_traffic_once(self, traffic_profile): - if self._terminated.value: - return - - traffic_profile.execute_traffic(self) self.client_started.value = 1 - time.sleep(self.RUN_DURATION) - self.client.stop(traffic_profile.ports) - time.sleep(self.WAIT_TIME) - samples = traffic_profile.get_drop_percentage(self) - self._queue.put(samples) - - if not self.rfc2544_helper.is_done(): - return - - self.client.stop(traffic_profile.ports) - self.client.reset(ports=traffic_profile.ports) - self.client.remove_all_streams(traffic_profile.ports) - traffic_profile.execute_traffic_latency(samples=samples) - multiplier = traffic_profile.calculate_pps(samples)[1] - for _ in range(5): - time.sleep(self.LATENCY_TIME_SLEEP) - self.client.stop(traffic_profile.ports) - time.sleep(self.WAIT_TIME) - last_res = self.client.get_stats(traffic_profile.ports) - if not isinstance(last_res, Mapping): - self._terminated.value = 1 - continue - self.generate_samples(traffic_profile.ports, 'latency', {}) - self._queue.put(samples) - self.client.start(mult=str(multiplier), - ports=traffic_profile.ports, - duration=120, force=True) + ports, port_pg_id = traffic_profile.execute_traffic(self) + + samples = [] + timeout = int(traffic_profile.config.duration) - self.TRANSIENT_PERIOD + time.sleep(self.TRANSIENT_PERIOD) + for _ in utils.Timer(timeout=timeout): + samples.append(self._get_samples(ports, port_pg_id=port_pg_id)) + time.sleep(self.SAMPLING_PERIOD) + + traffic_profile.stop_traffic(self) + output = traffic_profile.get_drop_percentage( + samples, self.rfc2544_helper.tolerance_low, + self.rfc2544_helper.tolerance_high, + self.rfc2544_helper.correlated_traffic) + self._queue.put(output) def start_client(self, ports, mult=None, duration=None, force=True): self.client.start(ports=ports, mult=mult, duration=duration, force=force) @@ -86,20 +57,16 @@ class TrexRfcResourceHelper(TrexResourceHelper): def clear_client_stats(self, ports): self.client.clear_stats(ports=ports) - def collect_kpi(self): - self.rfc2544_helper.iteration.value += 1 - return super(TrexRfcResourceHelper, self).collect_kpi() - -class TrexTrafficGenRFC(TrexTrafficGen): +class TrexTrafficGenRFC(tg_trex.TrexTrafficGen): """ This class handles mapping traffic profile and generating traffic for rfc2544 testcase. """ - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): if resource_helper_type is None: resource_helper_type = TrexRfcResourceHelper - - super(TrexTrafficGenRFC, self).__init__(name, vnfd, setup_env_helper_type, - resource_helper_type) + super(TrexTrafficGenRFC, self).__init__( + name, vnfd, task_id, setup_env_helper_type, resource_helper_type) diff --git a/yardstick/network_services/vnf_generic/vnf/tg_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_trex.py index 0084a124c..58b73488b 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_trex.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_trex.py @@ -13,7 +13,6 @@ # limitations under the License. """ Trex acts as traffic generation and vnf definitions based on IETS Spec """ -from __future__ import absolute_import import logging import os @@ -25,6 +24,7 @@ from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTraff from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper + LOG = logging.getLogger(__name__) @@ -165,6 +165,30 @@ class TrexResourceHelper(ClientResourceHelper): cmd = "sudo fuser -n tcp %s %s -k > /dev/null 2>&1" self.ssh_helper.execute(cmd % (self.SYNC_PORT, self.ASYNC_PORT)) + def _get_samples(self, ports, port_pg_id=None): + stats = self.get_stats(ports) + samples = {} + for pname in (intf['name'] for intf in self.vnfd_helper.interfaces): + port_num = self.vnfd_helper.port_num(pname) + port_stats = stats.get(port_num, {}) + samples[pname] = { + 'rx_throughput_fps': float(port_stats.get('rx_pps', 0.0)), + 'tx_throughput_fps': float(port_stats.get('tx_pps', 0.0)), + 'rx_throughput_bps': float(port_stats.get('rx_bps', 0.0)), + 'tx_throughput_bps': float(port_stats.get('tx_bps', 0.0)), + 'in_packets': int(port_stats.get('ipackets', 0)), + 'out_packets': int(port_stats.get('opackets', 0)), + } + + pg_id_list = port_pg_id.get_pg_ids(port_num) + samples[pname]['latency'] = {} + for pg_id in pg_id_list: + latency_global = stats.get('latency', {}) + pg_latency = latency_global.get(pg_id, {}).get('latency') + samples[pname]['latency'][pg_id] = pg_latency + + return samples + class TrexTrafficGen(SampleVNFTrafficGen): """ @@ -174,15 +198,14 @@ class TrexTrafficGen(SampleVNFTrafficGen): APP_NAME = 'TRex' - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): if resource_helper_type is None: resource_helper_type = TrexResourceHelper - if setup_env_helper_type is None: setup_env_helper_type = TrexDpdkVnfSetupEnvHelper - - super(TrexTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, - resource_helper_type) + super(TrexTrafficGen, self).__init__( + name, vnfd, task_id, setup_env_helper_type, resource_helper_type) def _check_status(self): return self.resource_helper.check_status() diff --git a/yardstick/network_services/vnf_generic/vnf/udp_replay.py b/yardstick/network_services/vnf_generic/vnf/udp_replay.py index a57f53bc7..e3fde1a79 100644 --- a/yardstick/network_services/vnf_generic/vnf/udp_replay.py +++ b/yardstick/network_services/vnf_generic/vnf/udp_replay.py @@ -19,7 +19,7 @@ from yardstick.common.process import check_if_process_failed from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper - +from yardstick.benchmark.contexts import base as ctx_base LOG = logging.getLogger(__name__) @@ -60,15 +60,14 @@ class UdpReplayApproxVnf(SampleVNF): PIPELINE_COMMAND = REPLAY_PIPELINE_COMMAND - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): if resource_helper_type is None: resource_helper_type = UdpReplayResourceHelper - if setup_env_helper_type is None: setup_env_helper_type = UdpReplaySetupEnvHelper - - super(UdpReplayApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, - resource_helper_type) + super(UdpReplayApproxVnf, self).__init__( + name, vnfd, task_id, setup_env_helper_type, resource_helper_type) def _build_pipeline_kwargs(self): ports = self.vnfd_helper.port_pairs.all_ports @@ -79,9 +78,11 @@ class UdpReplayApproxVnf(SampleVNF): ports_mask_hex = hex(sum(2 ** num for num in port_nums)) # one core extra for master cpu_mask_hex = hex(2 ** (number_of_ports + 1) - 1) + nfvi_context = ctx_base.Context.get_context_from_server( + self.scenario_helper.nodes[self.name]) hw_csum = "" if (not self.scenario_helper.options.get('hw_csum', False) or - self.nfvi_context.attrs.get('nfvi_type') not in self.HW_OFFLOADING_NFVI_TYPES): + nfvi_context.attrs.get('nfvi_type') not in self.HW_OFFLOADING_NFVI_TYPES): hw_csum = '--no-hw-csum' # tuples of (FLD_PORT, FLD_QUEUE, FLD_LCORE) @@ -116,7 +117,12 @@ class UdpReplayApproxVnf(SampleVNF): stats = self.get_stats() stats_words = stats.split() split_stats = stats_words[stats_words.index('0'):][:number_of_ports * 5] + + physical_node = ctx_base.Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + result = { + "physical_node": physical_node, "packets_in": get_sum(1), "packets_fwd": get_sum(2), "packets_dropped": get_sum(3) + get_sum(4), diff --git a/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py b/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py index 3ba1f91b7..a1523dee3 100644 --- a/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py @@ -14,9 +14,8 @@ import logging -from yardstick.common import utils -from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper -from yardstick.network_services.yang_model import YangModel +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF +from yardstick.network_services.vnf_generic.vnf.acl_vnf import AclApproxSetupEnvSetupEnvHelper LOG = logging.getLogger(__name__) @@ -27,7 +26,7 @@ FW_COLLECT_KPI = (r"""VFW TOTAL:[^p]+pkts_received"?:\s(\d+),[^p]+pkts_fw_forwar r"""[^p]+pkts_drop_fw"?:\s(\d+),\s""") -class FWApproxSetupEnvHelper(DpdkVnfSetupEnvHelper): +class FWApproxSetupEnvHelper(AclApproxSetupEnvSetupEnvHelper): APP_NAME = "vFW" CFG_CONFIG = "/tmp/vfw_config" @@ -37,6 +36,8 @@ class FWApproxSetupEnvHelper(DpdkVnfSetupEnvHelper): SW_DEFAULT_CORE = 5 HW_DEFAULT_CORE = 2 VNF_TYPE = "VFW" + RULE_CMD = "vfw" + DEFAULT_FWD_ACTIONS = ["accept", "count", "conntrack"] class FWApproxVnf(SampleVNF): @@ -51,17 +52,9 @@ class FWApproxVnf(SampleVNF): 'packets_dropped': 3, } - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = FWApproxSetupEnvHelper - - super(FWApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) - self.vfw_rules = None - - def _start_vnf(self): - yang_model_path = utils.find_relative_file( - self.scenario_helper.options['rules'], - self.scenario_helper.task_path) - yang_model = YangModel(yang_model_path) - self.vfw_rules = yang_model.get_rules() - super(FWApproxVnf, self)._start_vnf() + super(FWApproxVnf, self).__init__( + name, vnfd, task_id, setup_env_helper_type, resource_helper_type) diff --git a/yardstick/network_services/vnf_generic/vnf/vnf_ssh_helper.py b/yardstick/network_services/vnf_generic/vnf/vnf_ssh_helper.py index de6fd9329..6c5c6c833 100644 --- a/yardstick/network_services/vnf_generic/vnf/vnf_ssh_helper.py +++ b/yardstick/network_services/vnf_generic/vnf/vnf_ssh_helper.py @@ -47,6 +47,7 @@ class VnfSshHelper(AutoConnectSSH): def upload_config_file(self, prefix, content): cfg_file = os.path.join(constants.REMOTE_TMP, prefix) + LOG.debug('Config file name: %s', cfg_file) LOG.debug(content) file_obj = StringIO(content) self.put_file_obj(file_obj, cfg_file) diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py index 0067f6bf9..b7cf8b35e 100644 --- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py @@ -28,6 +28,7 @@ from yardstick.common.process import check_if_process_failed from yardstick.network_services.helpers.samplevnf_helper import PortPairs from yardstick.network_services.pipeline import PipelineRules from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper +from yardstick.benchmark.contexts import base as ctx_base LOG = logging.getLogger(__name__) @@ -115,7 +116,8 @@ class ConfigCreate(object): pktq = "SWQ{0}{1}".format(self.sw_q, sink) return pktq - def vpe_upstream(self, vnf_cfg, index=0): + def vpe_upstream(self, vnf_cfg, index=0): # pragma: no cover + # NOTE(ralonsoh): this function must be covered in UTs. parser = configparser.ConfigParser() parser.read(os.path.join(vnf_cfg, 'vpe_upstream')) @@ -147,7 +149,8 @@ class ConfigCreate(object): self.n_pipeline += 1 return parser - def vpe_downstream(self, vnf_cfg, index): + def vpe_downstream(self, vnf_cfg, index): # pragma: no cover + # NOTE(ralonsoh): this function must be covered in UTs. parser = configparser.ConfigParser() parser.read(os.path.join(vnf_cfg, 'vpe_downstream')) for pipeline in parser.sections(): @@ -288,11 +291,12 @@ class VpeApproxVnf(SampleVNF): COLLECT_KPI = VPE_COLLECT_KPI WAIT_TIME = 20 - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = VpeApproxSetupEnvHelper - - super(VpeApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) + super(VpeApproxVnf, self).__init__( + name, vnfd, task_id, setup_env_helper_type, resource_helper_type) def get_stats(self, *args, **kwargs): raise NotImplementedError @@ -300,7 +304,11 @@ class VpeApproxVnf(SampleVNF): def collect_kpi(self): # we can't get KPIs if the VNF is down check_if_process_failed(self._vnf_process) + physical_node = ctx_base.Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + result = { + "physical_node": physical_node, 'pkt_in_up_stream': 0, 'pkt_drop_up_stream': 0, 'pkt_in_down_stream': 0, diff --git a/yardstick/network_services/yang_model.py b/yardstick/network_services/yang_model.py deleted file mode 100644 index ec00c4513..000000000 --- a/yardstick/network_services/yang_model.py +++ /dev/null @@ -1,108 +0,0 @@ -# Copyright (c) 2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import absolute_import -from __future__ import print_function -import logging -import ipaddress -import six - -from yardstick.common.yaml_loader import yaml_load - -LOG = logging.getLogger(__name__) - - -class YangModel(object): - - RULE_TEMPLATE = "p acl add 1 {0} {1} {2} {3} {4} {5} {6} {7} 0 0 {8}" - - def __init__(self, config_file): - super(YangModel, self).__init__() - self._config_file = config_file - self._options = {} - self._rules = '' - - @property - def config_file(self): - return self._config_file - - @config_file.setter - def config_file(self, value): - self._config_file = value - self._options = {} - self._rules = '' - - def _read_config(self): - # TODO: add some error handling in case of empty or non-existing file - try: - with open(self._config_file) as f: - self._options = yaml_load(f) - except Exception as e: - LOG.exception("Failed to load the yaml %s", e) - raise - - def _get_entries(self): - if not self._options: - return '' - - rule_list = [] - for ace in self._options['access-list1']['acl']['access-list-entries']: - # TODO: resolve ports using topology file and nodes' - # ids: public or private. - matches = ace['ace']['matches'] - dst_ipv4_net = matches['destination-ipv4-network'] - dst_ipv4_net_ip = ipaddress.ip_interface(six.text_type(dst_ipv4_net)) - port0_local_network = dst_ipv4_net_ip.network.network_address.exploded - port0_prefix = dst_ipv4_net_ip.network.prefixlen - - src_ipv4_net = matches['source-ipv4-network'] - src_ipv4_net_ip = ipaddress.ip_interface(six.text_type(src_ipv4_net)) - port1_local_network = src_ipv4_net_ip.network.network_address.exploded - port1_prefix = src_ipv4_net_ip.network.prefixlen - - lower_dport = matches['destination-port-range']['lower-port'] - upper_dport = matches['destination-port-range']['upper-port'] - - lower_sport = matches['source-port-range']['lower-port'] - upper_sport = matches['source-port-range']['upper-port'] - - # TODO: proto should be read from file also. - # Now all rules in sample ACL file are TCP. - rule_list.append('') # get an extra new line - rule_list.append(self.RULE_TEMPLATE.format(port0_local_network, - port0_prefix, - port1_local_network, - port1_prefix, - lower_dport, - upper_dport, - lower_sport, - upper_sport, - 0)) - rule_list.append(self.RULE_TEMPLATE.format(port1_local_network, - port1_prefix, - port0_local_network, - port0_prefix, - lower_sport, - upper_sport, - lower_dport, - upper_dport, - 1)) - - self._rules = '\n'.join(rule_list) - - def get_rules(self): - if not self._rules: - self._read_config() - self._get_entries() - return self._rules |