diff options
Diffstat (limited to 'yardstick/network_services')
16 files changed, 1904 insertions, 232 deletions
diff --git a/yardstick/network_services/libs/ixia_libs/ixnet/ixnet_api.py b/yardstick/network_services/libs/ixia_libs/ixnet/ixnet_api.py index 8274ff9ce..d4f75babb 100644 --- a/yardstick/network_services/libs/ixia_libs/ixnet/ixnet_api.py +++ b/yardstick/network_services/libs/ixia_libs/ixnet/ixnet_api.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2018 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -50,6 +50,20 @@ TRAFFIC_STATUS_STOPPED = 'stopped' SUPPORTED_PROTO = [PROTO_UDP] +class Vlan(object): + def __init__(self, + vlan_id, vlan_id_step=None, vlan_id_direction='increment', + prio=None, prio_step=None, prio_direction='increment', + tp_id=None): + self.vlan_id = vlan_id + self.vlan_id_step = vlan_id_step + self.vlan_id_direction = vlan_id_direction + self.prio = prio + self.prio_step = prio_step + self.prio_direction = prio_direction + self.tp_id = tp_id + + # NOTE(ralonsoh): this pragma will be removed in the last patch of this series class IxNextgen(object): # pragma: no cover @@ -107,6 +121,11 @@ class IxNextgen(object): # pragma: no cover return self._ixnet raise exceptions.IxNetworkClientNotConnected() + def get_vports(self): + """Return the list of assigned ports (vport objects)""" + vports = self.ixnet.getList(self.ixnet.getRoot(), 'vport') + return vports + def _get_config_element_by_flow_group_name(self, flow_group_name): """Get a config element using the flow group name @@ -225,15 +244,20 @@ class IxNextgen(object): # pragma: no cover zip(self._cfg['cards'], self._cfg['ports'])] log.info('Create and assign vports: %s', ports) - for port in ports: - vport = self.ixnet.add(self.ixnet.getRoot(), 'vport') - self.ixnet.commit() - self.ixnet.execute('assignPorts', [port], [], [vport], True) + + vports = [] + for _ in ports: + vports.append(self.ixnet.add(self.ixnet.getRoot(), 'vport')) self.ixnet.commit() + + self.ixnet.execute('assignPorts', ports, [], vports, True) + self.ixnet.commit() + + for vport in vports: if self.ixnet.getAttribute(vport, '-state') != 'up': log.warning('Port %s is down', vport) - def _create_traffic_item(self): + def _create_traffic_item(self, traffic_type='raw'): """Create the traffic item to hold the flow groups The traffic item tracking by "Traffic Item" is enabled to retrieve the @@ -243,7 +267,7 @@ class IxNextgen(object): # pragma: no cover traffic_item = self.ixnet.add(self.ixnet.getRoot() + '/traffic', 'trafficItem') self.ixnet.setMultiAttribute(traffic_item, '-name', 'RFC2544', - '-trafficType', 'raw') + '-trafficType', traffic_type) self.ixnet.commit() traffic_item_id = self.ixnet.remapIds(traffic_item)[0] @@ -251,27 +275,25 @@ class IxNextgen(object): # pragma: no cover '-trackBy', 'trafficGroupId0') self.ixnet.commit() - def _create_flow_groups(self): - """Create the flow groups between the assigned ports""" + def _create_flow_groups(self, uplink, downlink): + """Create the flow groups between the endpoints""" traffic_item_id = self.ixnet.getList(self.ixnet.getRoot() + 'traffic', 'trafficItem')[0] log.info('Create the flow groups') - vports = self.ixnet.getList(self.ixnet.getRoot(), 'vport') - uplink_ports = vports[::2] - downlink_ports = vports[1::2] + index = 0 - for up, down in zip(uplink_ports, downlink_ports): + for up, down in zip(uplink, downlink): log.info('FGs: %s <--> %s', up, down) endpoint_set_1 = self.ixnet.add(traffic_item_id, 'endpointSet') endpoint_set_2 = self.ixnet.add(traffic_item_id, 'endpointSet') self.ixnet.setMultiAttribute( endpoint_set_1, '-name', str(index + 1), - '-sources', [up + '/protocols'], - '-destinations', [down + '/protocols']) + '-sources', [up], + '-destinations', [down]) self.ixnet.setMultiAttribute( endpoint_set_2, '-name', str(index + 2), - '-sources', [down + '/protocols'], - '-destinations', [up + '/protocols']) + '-sources', [down], + '-destinations', [up]) self.ixnet.commit() index += 2 @@ -281,7 +303,7 @@ class IxNextgen(object): # pragma: no cover '/traffic/protocolTemplate:"{}"'.format(protocol_name)) self.ixnet.execute('append', previous_element, protocol) - def _setup_config_elements(self): + def _setup_config_elements(self, add_default_proto=True): """Setup the config elements The traffic item is configured to allow individual configurations per @@ -303,12 +325,13 @@ class IxNextgen(object): # pragma: no cover self.ixnet.setAttribute(config_element + '/frameRateDistribution', '-streamDistribution', 'splitRateEvenly') self.ixnet.commit() - self._append_procotol_to_stack( - PROTO_UDP, config_element + '/stack:"ethernet-1"') - self._append_procotol_to_stack( - PROTO_IPV4, config_element + '/stack:"ethernet-1"') + if add_default_proto: + self._append_procotol_to_stack( + PROTO_UDP, config_element + '/stack:"ethernet-1"') + self._append_procotol_to_stack( + PROTO_IPV4, config_element + '/stack:"ethernet-1"') - def create_traffic_model(self): + def create_traffic_model(self, uplink_ports, downlink_ports): """Create a traffic item and the needed flow groups Each flow group inside the traffic item (only one is present) @@ -319,10 +342,27 @@ class IxNextgen(object): # pragma: no cover FlowGroup3: port3 -> port4 FlowGroup4: port3 <- port4 """ - self._create_traffic_item() - self._create_flow_groups() + self._create_traffic_item('raw') + uplink_endpoints = [port + '/protocols' for port in uplink_ports] + downlink_endpoints = [port + '/protocols' for port in downlink_ports] + self._create_flow_groups(uplink_endpoints, downlink_endpoints) self._setup_config_elements() + def create_ipv4_traffic_model(self, uplink_topologies, downlink_topologies): + """Create a traffic item and the needed flow groups + + Each flow group inside the traffic item (only one is present) + represents the traffic between two topologies: + (uplink) (downlink) + FlowGroup1: uplink1 -> downlink1 + FlowGroup2: uplink1 <- downlink1 + FlowGroup3: uplink2 -> downlink2 + FlowGroup4: uplink2 <- downlink2 + """ + self._create_traffic_item('ipv4') + self._create_flow_groups(uplink_topologies, downlink_topologies) + self._setup_config_elements(False) + def _update_frame_mac(self, ethernet_descriptor, field, mac_address): """Set the MAC address in a config element stack Ethernet field @@ -366,16 +406,15 @@ class IxNextgen(object): # pragma: no cover raise exceptions.IxNetworkFlowNotPresent(flow_group=fg_id) type = traffic_param.get('traffic_type', 'fixedDuration') - rate = traffic_param['rate'] rate_unit = ( 'framesPerSecond' if traffic_param['rate_unit'] == tp_base.TrafficProfileConfig.RATE_FPS else 'percentLineRate') weighted_range_pairs = self._parse_framesize( - traffic_param['outer_l2']['framesize']) - srcmac = str(traffic_param.get('srcmac', '00:00:00:00:00:01')) - dstmac = str(traffic_param.get('dstmac', '00:00:00:00:00:02')) + traffic_param['outer_l2'].get('framesize', {})) + srcmac = str(traffic_param['outer_l2'].get('srcmac', '00:00:00:00:00:01')) + dstmac = str(traffic_param['outer_l2'].get('dstmac', '00:00:00:00:00:02')) - if traffic_param['outer_l2']['QinQ']: + if traffic_param['outer_l2'].get('QinQ'): s_vlan = traffic_param['outer_l2']['QinQ']['S-VLAN'] c_vlan = traffic_param['outer_l2']['QinQ']['C-VLAN'] @@ -400,21 +439,27 @@ class IxNextgen(object): # pragma: no cover self.ixnet.setMultiAttribute( config_element + '/transmissionControl', '-type', type, '-duration', duration) + self.ixnet.setMultiAttribute( config_element + '/frameRate', - '-rate', rate, '-type', rate_unit) - self.ixnet.setMultiAttribute( - config_element + '/frameSize', - '-type', 'weightedPairs', - '-weightedRangePairs', weighted_range_pairs) + '-rate', traffic_param['rate'], '-type', rate_unit) + + if len(weighted_range_pairs): + self.ixnet.setMultiAttribute( + config_element + '/frameSize', + '-type', 'weightedPairs', + '-weightedRangePairs', weighted_range_pairs) + self.ixnet.commit() - self._update_frame_mac( - self._get_stack_item(fg_id, PROTO_ETHERNET)[0], - 'destinationAddress', dstmac) - self._update_frame_mac( - self._get_stack_item(fg_id, PROTO_ETHERNET)[0], - 'sourceAddress', srcmac) + if dstmac: + self._update_frame_mac( + self._get_stack_item(fg_id, PROTO_ETHERNET)[0], + 'destinationAddress', dstmac) + if srcmac: + self._update_frame_mac( + self._get_stack_item(fg_id, PROTO_ETHERNET)[0], + 'sourceAddress', srcmac) def _update_vlan_tag(self, fg_id, params, vlan=0): field_to_param_map = { @@ -474,19 +519,24 @@ class IxNextgen(object): # pragma: no cover if not self._get_config_element_by_flow_group_name(fg_id): raise exceptions.IxNetworkFlowNotPresent(flow_group=fg_id) - count = traffic_param['outer_l3']['count'] - srcip = str(traffic_param['outer_l3']['srcip']) - dstip = str(traffic_param['outer_l3']['dstip']) - seed = traffic_param['outer_l3']['seed'] - srcmask = traffic_param['outer_l3']['srcmask'] or IP_VERSION_4_MASK - dstmask = traffic_param['outer_l3']['dstmask'] or IP_VERSION_4_MASK - - self._update_ipv4_address( - self._get_stack_item(fg_id, PROTO_IPV4)[0], - 'srcIp', srcip, seed, srcmask, count) - self._update_ipv4_address( - self._get_stack_item(fg_id, PROTO_IPV4)[0], - 'dstIp', dstip, seed, dstmask, count) + if traffic_param['outer_l3']: + count = traffic_param['outer_l3']['count'] + srcip = traffic_param['outer_l3']['srcip'] + dstip = traffic_param['outer_l3']['dstip'] + srcseed = traffic_param['outer_l3']['srcseed'] + dstseed = traffic_param['outer_l3']['dstseed'] + srcmask = traffic_param['outer_l3']['srcmask'] \ + or IP_VERSION_4_MASK + dstmask = traffic_param['outer_l3']['dstmask'] \ + or IP_VERSION_4_MASK + if srcip: + self._update_ipv4_address( + self._get_stack_item(fg_id, PROTO_IPV4)[0], + 'srcIp', str(srcip), srcseed, srcmask, count) + if dstip: + self._update_ipv4_address( + self._get_stack_item(fg_id, PROTO_IPV4)[0], + 'dstIp', str(dstip), dstseed, dstmask, count) def update_l4(self, traffic): """Update the L4 headers @@ -500,7 +550,10 @@ class IxNextgen(object): # pragma: no cover if not self._get_config_element_by_flow_group_name(fg_id): raise exceptions.IxNetworkFlowNotPresent(flow_group=fg_id) - proto = traffic_param['outer_l3']['proto'] + proto = traffic_param['outer_l3'].get('proto') + if not (proto and traffic_param['outer_l4']): + continue + if proto not in SUPPORTED_PROTO: raise exceptions.IXIAUnsupportedProtocol(protocol=proto) @@ -513,12 +566,15 @@ class IxNextgen(object): # pragma: no cover dstport = traffic_param['outer_l4']['dstport'] dstmask = traffic_param['outer_l4']['dstportmask'] - if proto in SUPPORTED_PROTO: - self._update_udp_port(self._get_stack_item(fg_id, proto)[0], - 'srcPort', srcport, seed, srcmask, count) - - self._update_udp_port(self._get_stack_item(fg_id, proto)[0], - 'dstPort', dstport, seed, dstmask, count) + if proto == PROTO_UDP: + if srcport: + self._update_udp_port( + self._get_stack_item(fg_id, proto)[0], + 'srcPort', srcport, seed, srcmask, count) + if dstport: + self._update_udp_port( + self._get_stack_item(fg_id, proto)[0], + 'dstPort', dstport, seed, dstmask, count) def _update_udp_port(self, descriptor, field, value, seed=1, mask=0, count=1): @@ -569,6 +625,12 @@ class IxNextgen(object): # pragma: no cover self.LATENCY_NAME_MAP)) return stats + def start_protocols(self): + self.ixnet.execute('startAllProtocols') + + def stop_protocols(self): + self.ixnet.execute('stopAllProtocols') + def start_traffic(self): """Start the traffic injection in the traffic item @@ -586,3 +648,194 @@ class IxNextgen(object): # pragma: no cover self.ixnet.execute('start', '/traffic') # pylint: disable=unnecessary-lambda utils.wait_until_true(lambda: self.is_traffic_running()) + + def add_topology(self, name, vports): + log.debug("add_topology: name='%s' ports='%s'", name, vports) + obj = self.ixnet.add(self.ixnet.getRoot(), 'topology') + self.ixnet.setMultiAttribute(obj, '-name', name, '-vports', vports) + self.ixnet.commit() + return obj + + def add_device_group(self, topology, name, multiplier): + log.debug("add_device_group: tpl='%s', name='%s', multiplier='%s'", + topology, name, multiplier) + + obj = self.ixnet.add(topology, 'deviceGroup') + self.ixnet.setMultiAttribute(obj, '-name', name, '-multiplier', + multiplier) + self.ixnet.commit() + return obj + + def add_ethernet(self, dev_group, name): + log.debug( + "add_ethernet: device_group='%s' name='%s'", dev_group, name) + obj = self.ixnet.add(dev_group, 'ethernet') + self.ixnet.setMultiAttribute(obj, '-name', name) + self.ixnet.commit() + return obj + + def _create_vlans(self, ethernet, count): + self.ixnet.setMultiAttribute(ethernet, '-useVlans', 'true') + self.ixnet.setMultiAttribute(ethernet, '-vlanCount', count) + self.ixnet.commit() + + def _configure_vlans(self, ethernet, vlans): + vlans_obj = self.ixnet.getList(ethernet, 'vlan') + for i, vlan_obj in enumerate(vlans_obj): + if vlans[i].vlan_id_step is not None: + vlan_id_obj = self.ixnet.getAttribute(vlan_obj, '-vlanId') + self.ixnet.setMultiAttribute(vlan_id_obj, '-clearOverlays', + 'true', '-pattern', 'counter') + vlan_id_counter = self.ixnet.add(vlan_id_obj, 'counter') + self.ixnet.setMultiAttribute(vlan_id_counter, '-start', + vlans[i].vlan_id, '-step', + vlans[i].vlan_id_step, + '-direction', + vlans[i].vlan_id_direction) + else: + vlan_id_obj = self.ixnet.getAttribute(vlan_obj, '-vlanId') + self.ixnet.setMultiAttribute(vlan_id_obj + '/singleValue', + '-value', vlans[i].vlan_id) + + if vlans[i].prio_step is not None: + prio_obj = self.ixnet.getAttribute(vlan_obj, '-priority') + self.ixnet.setMultiAttribute(prio_obj, '-clearOverlays', 'true', + '-pattern', 'counter') + prio_counter = self.ixnet.add(prio_obj, 'counter') + self.ixnet.setMultiAttribute(prio_counter, + '-start', vlans[i].prio, + '-step', vlans[i].prio_step, + '-direction', vlans[i].prio_direction) + elif vlans[i].prio is not None: + prio_obj = self.ixnet.getAttribute(vlan_obj, '-priority') + self.ixnet.setMultiAttribute(prio_obj + '/singleValue', + '-value', vlans[i].prio) + + if vlans[i].tp_id is not None: + tp_id_obj = self.ixnet.getAttribute(vlan_obj, '-tpid') + self.ixnet.setMultiAttribute(tp_id_obj + '/singleValue', + '-value', vlans[i].tp_id) + + self.ixnet.commit() + + def add_vlans(self, ethernet, vlans): + log.debug("add_vlans: ethernet='%s'", ethernet) + + if vlans is None or len(vlans) == 0: + raise RuntimeError( + "Invalid 'vlans' argument. Expected list of Vlan instances.") + + self._create_vlans(ethernet, len(vlans)) + self._configure_vlans(ethernet, vlans) + + def add_ipv4(self, ethernet, name='', + addr=None, addr_step=None, addr_direction='increment', + prefix=None, prefix_step=None, prefix_direction='increment', + gateway=None, gw_step=None, gw_direction='increment'): + log.debug("add_ipv4: ethernet='%s' name='%s'", ethernet, name) + obj = self.ixnet.add(ethernet, 'ipv4') + if name != '': + self.ixnet.setAttribute(obj, '-name', name) + self.ixnet.commit() + + if addr_step is not None: + # handle counter pattern + _address = self.ixnet.getAttribute(obj, '-address') + self.ixnet.setMultiAttribute(_address, '-clearOverlays', 'true', + '-pattern', 'counter') + + address_counter = self.ixnet.add(_address, 'counter') + self.ixnet.setMultiAttribute(address_counter, + '-start', addr, + '-step', addr_step, + '-direction', addr_direction) + elif addr is not None: + # handle single value + _address = self.ixnet.getAttribute(obj, '-address') + self.ixnet.setMultiAttribute(_address + '/singleValue', '-value', + addr) + + if prefix_step is not None: + # handle counter pattern + _prefix = self.ixnet.getAttribute(obj, '-prefix') + self.ixnet.setMultiAttribute(_prefix, '-clearOverlays', 'true', + '-pattern', 'counter') + prefix_counter = self.ixnet.add(_prefix, 'counter') + self.ixnet.setMultiAttribute(prefix_counter, + '-start', prefix, + '-step', prefix_step, + '-direction', prefix_direction) + elif prefix is not None: + # handle single value + _prefix = self.ixnet.getAttribute(obj, '-prefix') + self.ixnet.setMultiAttribute(_prefix + '/singleValue', '-value', + prefix) + + if gw_step is not None: + # handle counter pattern + _gateway = self.ixnet.getAttribute(obj, '-gatewayIp') + self.ixnet.setMultiAttribute(_gateway, '-clearOverlays', 'true', + '-pattern', 'counter') + + gateway_counter = self.ixnet.add(_gateway, 'counter') + self.ixnet.setMultiAttribute(gateway_counter, + '-start', gateway, + '-step', gw_step, + '-direction', gw_direction) + elif gateway is not None: + # handle single value + _gateway = self.ixnet.getAttribute(obj, '-gatewayIp') + self.ixnet.setMultiAttribute(_gateway + '/singleValue', '-value', + gateway) + + self.ixnet.commit() + return obj + + def add_pppox_client(self, xproto, auth, user, pwd): + log.debug( + "add_pppox_client: xproto='%s', auth='%s', user='%s', pwd='%s'", + xproto, auth, user, pwd) + obj = self.ixnet.add(xproto, 'pppoxclient') + self.ixnet.commit() + + if auth == 'pap': + auth_type = self.ixnet.getAttribute(obj, '-authType') + self.ixnet.setMultiAttribute(auth_type + '/singleValue', '-value', + auth) + pap_user = self.ixnet.getAttribute(obj, '-papUser') + self.ixnet.setMultiAttribute(pap_user + '/singleValue', '-value', + user) + pap_pwd = self.ixnet.getAttribute(obj, '-papPassword') + self.ixnet.setMultiAttribute(pap_pwd + '/singleValue', '-value', + pwd) + else: + raise NotImplementedError() + + self.ixnet.commit() + return obj + + def add_bgp(self, ipv4, dut_ip, local_as, bgp_type=None): + """Add BGP protocol""" + log.debug("add_bgp: ipv4='%s', dut_ip='%s', local_as='%s'", ipv4, + dut_ip, local_as) + obj = self.ixnet.add(ipv4, 'bgpIpv4Peer') + self.ixnet.commit() + + # Set DUT IP address + dut_ip_addr = self.ixnet.getAttribute(obj, '-dutIp') + self.ixnet.setAttribute(dut_ip_addr + '/singleValue', + '-value', dut_ip) + + # Set local AS number + local_as_number = self.ixnet.getAttribute(obj, '-localAs2Bytes') + self.ixnet.setAttribute(local_as_number + '/singleValue', + '-value', local_as) + + if bgp_type: + # Set BGP type. If not specified, default value is using. + # Default type is "internal" + bgp_type_field = self.ixnet.getAttribute(obj, '-type') + self.ixnet.setAttribute(bgp_type_field + '/singleValue', + '-value', bgp_type) + self.ixnet.commit() + return obj diff --git a/yardstick/network_services/traffic_profile/__init__.py b/yardstick/network_services/traffic_profile/__init__.py index a1b26a24d..91d8a665f 100644 --- a/yardstick/network_services/traffic_profile/__init__.py +++ b/yardstick/network_services/traffic_profile/__init__.py @@ -28,6 +28,7 @@ def register_modules(): 'yardstick.network_services.traffic_profile.prox_ramp', 'yardstick.network_services.traffic_profile.rfc2544', 'yardstick.network_services.traffic_profile.pktgen', + 'yardstick.network_services.traffic_profile.landslide_profile', ] for module in modules: diff --git a/yardstick/network_services/traffic_profile/base.py b/yardstick/network_services/traffic_profile/base.py index a8f950b7b..ea3f17874 100644 --- a/yardstick/network_services/traffic_profile/base.py +++ b/yardstick/network_services/traffic_profile/base.py @@ -44,6 +44,7 @@ class TrafficProfileConfig(object): self.lower_bound = tprofile.get('lower_bound') self.upper_bound = tprofile.get('upper_bound') self.step_interval = tprofile.get('step_interval') + self.enable_latency = tprofile.get('enable_latency', False) def _parse_rate(self, rate): """Parse traffic profile rate @@ -96,6 +97,9 @@ class TrafficProfile(object): self.params = tp_config self.config = TrafficProfileConfig(tp_config) + def is_ended(self): + return False + def execute_traffic(self, traffic_generator, **kawrgs): """ This methods defines the behavior of the traffic generator. It will be called in a loop until the traffic generator exits. diff --git a/yardstick/network_services/traffic_profile/ixia_rfc2544.py b/yardstick/network_services/traffic_profile/ixia_rfc2544.py index 26dc1fe04..b8aa78d80 100644 --- a/yardstick/network_services/traffic_profile/ixia_rfc2544.py +++ b/yardstick/network_services/traffic_profile/ixia_rfc2544.py @@ -56,67 +56,83 @@ class IXIARFC2544Profile(trex_traffic_profile.TrexProfile): if not traffickey.startswith((self.UPLINK, self.DOWNLINK)): continue + # values should be single-item dict, so just grab the first item try: - # values should be single-item dict, so just grab the first item - try: - key, value = next(iter(values.items())) - except StopIteration: - result[traffickey] = {} - continue - - port_id = value.get('id', 1) - port_index = port_id - 1 - - if value.get('outer_l3v4'): - ip = value['outer_l3v4'] - src_key, dst_key = 'srcip4', 'dstip4' - else: - ip = value['outer_l3v6'] - src_key, dst_key = 'srcip6', 'dstip6' - - srcip, srcmask = self._get_ip_and_mask(ip[src_key]) - dstip, dstmask = self._get_ip_and_mask(ip[dst_key]) - - outer_l4 = value.get('outer_l4') - src_port, src_port_mask = self._get_fixed_and_mask(outer_l4['srcport']) - dst_port, dst_port_mask = self._get_fixed_and_mask(outer_l4['dstport']) - result[traffickey] = { - 'bidir': False, - 'id': port_id, - 'rate': self.rate, - 'rate_unit': self.rate_unit, - 'outer_l2': { - 'framesize': value['outer_l2']['framesize'], - 'framesPerSecond': True, - 'QinQ': value['outer_l2'].get('QinQ'), - 'srcmac': mac['src_mac_{}'.format(port_index)], - 'dstmac': mac['dst_mac_{}'.format(port_index)], - }, - 'outer_l3': { - 'count': ip['count'], - 'dscp': ip['dscp'], - 'ttl': ip['ttl'], - 'seed': ip['seed'], - 'srcip': srcip, - 'dstip': dstip, - 'srcmask': srcmask, - 'dstmask': dstmask, - 'type': key, - 'proto': ip['proto'], - }, - 'outer_l4': { - 'srcport': src_port, - 'dstport': dst_port, - 'srcportmask': src_port_mask, - 'dstportmask': dst_port_mask, - 'count': outer_l4['count'], - 'seed': outer_l4['seed'], - } - - } - except KeyError: + key, value = next(iter(values.items())) + except StopIteration: + result[traffickey] = {} continue + port_id = value.get('id', 1) + port_index = port_id - 1 + + result[traffickey] = { + 'bidir': False, + 'id': port_id, + 'rate': self.rate, + 'rate_unit': self.rate_unit, + 'outer_l2': {}, + 'outer_l3': {}, + 'outer_l4': {}, + } + + outer_l2 = value.get('outer_l2') + if outer_l2: + result[traffickey]['outer_l2'].update({ + 'framesize': outer_l2.get('framesize'), + 'framesPerSecond': True, + 'QinQ': outer_l2.get('QinQ'), + 'srcmac': mac.get('src_mac_{}'.format(port_index)), + 'dstmac': mac.get('dst_mac_{}'.format(port_index)), + }) + + if value.get('outer_l3v4'): + outer_l3 = value['outer_l3v4'] + src_key, dst_key = 'srcip4', 'dstip4' + else: + outer_l3 = value.get('outer_l3v6') + src_key, dst_key = 'srcip6', 'dstip6' + if outer_l3: + srcip = srcmask = dstip = dstmask = None + if outer_l3.get(src_key): + srcip, srcmask = self._get_ip_and_mask(outer_l3[src_key]) + if outer_l3.get(dst_key): + dstip, dstmask = self._get_ip_and_mask(outer_l3[dst_key]) + + result[traffickey]['outer_l3'].update({ + 'count': outer_l3.get('count', 1), + 'dscp': outer_l3.get('dscp'), + 'ttl': outer_l3.get('ttl'), + 'srcseed': outer_l3.get('srcseed', 1), + 'dstseed': outer_l3.get('dstseed', 1), + 'srcip': srcip, + 'dstip': dstip, + 'srcmask': srcmask, + 'dstmask': dstmask, + 'type': key, + 'proto': outer_l3.get('proto'), + }) + + outer_l4 = value.get('outer_l4') + if outer_l4: + src_port = src_port_mask = dst_port = dst_port_mask = None + if outer_l4.get('srcport'): + src_port, src_port_mask = ( + self._get_fixed_and_mask(outer_l4['srcport'])) + + if outer_l4.get('dstport'): + dst_port, dst_port_mask = ( + self._get_fixed_and_mask(outer_l4['dstport'])) + + result[traffickey]['outer_l4'].update({ + 'srcport': src_port, + 'dstport': dst_port, + 'srcportmask': src_port_mask, + 'dstportmask': dst_port_mask, + 'count': outer_l4.get('count', 1), + 'seed': outer_l4.get('seed', 1), + }) + return result def _ixia_traffic_generate(self, traffic, ixia_obj): @@ -168,12 +184,8 @@ class IXIARFC2544Profile(trex_traffic_profile.TrexProfile): [samples[iface]['in_packets'] for iface in samples]) out_packets_sum = sum( [samples[iface]['out_packets'] for iface in samples]) - rx_throughput = sum( - [samples[iface]['RxThroughput'] for iface in samples]) - rx_throughput = round(float(rx_throughput), 2) - tx_throughput = sum( - [samples[iface]['TxThroughput'] for iface in samples]) - tx_throughput = round(float(tx_throughput), 2) + rx_throughput = round(float(in_packets_sum) / duration, 3) + tx_throughput = round(float(out_packets_sum) / duration, 3) packet_drop = abs(out_packets_sum - in_packets_sum) try: @@ -183,10 +195,6 @@ class IXIARFC2544Profile(trex_traffic_profile.TrexProfile): except ZeroDivisionError: LOG.info('No traffic is flowing') - samples['TxThroughput'] = tx_throughput - samples['RxThroughput'] = rx_throughput - samples['DropPercentage'] = drop_percent - if first_run: completed = True if drop_percent <= tolerance else False if (first_run and @@ -200,4 +208,21 @@ class IXIARFC2544Profile(trex_traffic_profile.TrexProfile): else: completed = True + latency_ns_avg = float( + sum([samples[iface]['Store-Forward_Avg_latency_ns'] + for iface in samples])) / num_ifaces + latency_ns_min = float( + sum([samples[iface]['Store-Forward_Min_latency_ns'] + for iface in samples])) / num_ifaces + latency_ns_max = float( + sum([samples[iface]['Store-Forward_Max_latency_ns'] + for iface in samples])) / num_ifaces + + samples['TxThroughput'] = tx_throughput + samples['RxThroughput'] = rx_throughput + samples['DropPercentage'] = drop_percent + samples['latency_ns_avg'] = latency_ns_avg + samples['latency_ns_min'] = latency_ns_min + samples['latency_ns_max'] = latency_ns_max + return completed, samples diff --git a/yardstick/network_services/traffic_profile/landslide_profile.py b/yardstick/network_services/traffic_profile/landslide_profile.py new file mode 100644 index 000000000..f79226fb4 --- /dev/null +++ b/yardstick/network_services/traffic_profile/landslide_profile.py @@ -0,0 +1,47 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" Spirent Landslide traffic profile definitions """ + +from yardstick.network_services.traffic_profile import base + + +class LandslideProfile(base.TrafficProfile): + """ + This traffic profile handles attributes of Landslide data stream + """ + + def __init__(self, tp_config): + super(LandslideProfile, self).__init__(tp_config) + + # for backward compatibility support dict and list of dicts + if isinstance(tp_config["dmf_config"], dict): + self.dmf_config = [tp_config["dmf_config"]] + else: + self.dmf_config = tp_config["dmf_config"] + + def execute(self, traffic_generator): + pass + + def update_dmf(self, options): + if 'dmf' in options: + if isinstance(options['dmf'], dict): + _dmfs = [options['dmf']] + else: + _dmfs = options['dmf'] + + for index, _dmf in enumerate(_dmfs): + try: + self.dmf_config[index].update(_dmf) + except IndexError: + pass diff --git a/yardstick/network_services/traffic_profile/prox_binsearch.py b/yardstick/network_services/traffic_profile/prox_binsearch.py index 506a880e0..f924cf419 100644 --- a/yardstick/network_services/traffic_profile/prox_binsearch.py +++ b/yardstick/network_services/traffic_profile/prox_binsearch.py @@ -25,6 +25,14 @@ from yardstick.common import constants as overall_constants LOG = logging.getLogger(__name__) +STATUS_SUCCESS = "Success" +STATUS_FAIL = "Failure" +STATUS_RESULT = "Result" +STEP_CONFIRM = "Confirm retry" +STEP_INCREASE_LOWER = "Increase lower" +STEP_DECREASE_LOWER = "Decrease lower" +STEP_DECREASE_UPPER = "Decrease upper" + class ProxBinSearchProfile(ProxProfile): """ @@ -58,6 +66,9 @@ class ProxBinSearchProfile(ProxProfile): yield test_value test_value = self.mid_point + def is_ended(self): + return self.done.is_set() + def run_test_with_pkt_size(self, traffic_gen, pkt_size, duration): """Run the test for a single packet size. @@ -85,18 +96,16 @@ class ProxBinSearchProfile(ProxProfile): # success, the binary search will complete on an integer multiple # of the precision, rather than on a fraction of it. - theor_max_thruput = actual_max_thruput = 0 + theor_max_thruput = 0.0 result_samples = {} - # Store one time only value in influxdb - single_samples = { + test_data = { "test_duration": traffic_gen.scenario_helper.scenario_cfg["runner"]["duration"], "test_precision": self.params["traffic_profile"]["test_precision"], "tolerated_loss": self.params["traffic_profile"]["tolerated_loss"], "duration": duration } - self.queue.put(single_samples) self.prev_time = time.time() # throughput and packet loss from the most recent successful test @@ -110,85 +119,88 @@ class ProxBinSearchProfile(ProxProfile): neg_retry = 0 total_retry = 0 - LOG.info("Checking MAX %s MIN %s TEST %s", - self.current_upper, self.lower_bound, test_value) + LOG.info("Checking MAX %s MIN %s TEST %s", self.current_upper, + self.lower_bound, test_value) + while (pos_retry <= ok_retry) and (neg_retry <= ok_retry): total_retry = total_retry + 1 + result, port_samples = self._profile_helper.run_test(pkt_size, duration, test_value, self.tolerated_loss, line_speed) - if (total_retry > (ok_retry * 3)) and (ok_retry is not 0): - LOG.info("Failure.!! .. RETRY EXCEEDED ... decrease lower bound") + if (total_retry > (ok_retry * 3)) and (ok_retry is not 0): + status = STATUS_FAIL + next_step = STEP_DECREASE_LOWER successful_pkt_loss = result.pkt_loss - samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) - self.current_upper = test_value neg_retry = total_retry elif result.success: if (pos_retry < ok_retry) and (ok_retry is not 0): - neg_retry = 0 - LOG.info("Success! ... confirm retry") - + status = STATUS_SUCCESS + next_step = STEP_CONFIRM successful_pkt_loss = result.pkt_loss - samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) - + neg_retry = 0 else: - LOG.info("Success! Increasing lower bound") + status = STATUS_SUCCESS + next_step = STEP_INCREASE_LOWER self.current_lower = test_value - successful_pkt_loss = result.pkt_loss - samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) - - # store results with success tag in influxdb - success_samples = \ - {'Success_' + key: value for key, value in samples.items()} - - success_samples["Success_rx_total"] = int(result.rx_total) - success_samples["Success_tx_total"] = int(result.tx_total) - success_samples["Success_can_be_lost"] = int(result.can_be_lost) - success_samples["Success_drop_total"] = int(result.drop_total) - success_samples["Success_RxThroughput"] = samples["RxThroughput"] - success_samples["Success_RxThroughput_gbps"] = \ - (samples["RxThroughput"] / 1000) * ((pkt_size + 20)* 8) - LOG.info(">>>##>>Collect SUCCESS TG KPIs %s %s", - datetime.datetime.now(), success_samples) - self.queue.put(success_samples, True, overall_constants.QUEUE_PUT_TIMEOUT) - - # Store Actual throughput for result samples - actual_max_thruput = success_samples["Success_RxThroughput"] pos_retry = pos_retry + 1 else: if (neg_retry < ok_retry) and (ok_retry is not 0): - + status = STATUS_FAIL + next_step = STEP_CONFIRM pos_retry = 0 - LOG.info("failure! ... confirm retry") else: - LOG.info("Failure... Decreasing upper bound") + status = STATUS_FAIL + next_step = STEP_DECREASE_UPPER self.current_upper = test_value neg_retry = neg_retry + 1 - samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) + + LOG.info( + "Status = '%s' Next_Step = '%s'", status, next_step) + + samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) if theor_max_thruput < samples["TxThroughput"]: theor_max_thruput = samples['TxThroughput'] - self.queue.put({'theor_max_throughput': theor_max_thruput}) - - LOG.info(">>>##>>Collect TG KPIs %s %s", datetime.datetime.now(), samples) + samples['theor_max_throughput'] = theor_max_thruput + + samples["rx_total"] = int(result.rx_total) + samples["tx_total"] = int(result.tx_total) + samples["can_be_lost"] = int(result.can_be_lost) + samples["drop_total"] = int(result.drop_total) + samples["RxThroughput_gbps"] = \ + (samples["RxThroughput"] / 1000) * ((pkt_size + 20) * 8) + samples['Status'] = status + samples['Next_Step'] = next_step samples["MAX_Rate"] = self.current_upper samples["MIN_Rate"] = self.current_lower samples["Test_Rate"] = test_value samples["Step_Id"] = step_id samples["Confirmation_Retry"] = total_retry + + samples.update(test_data) + + if status == STATUS_SUCCESS and next_step == STEP_INCREASE_LOWER: + # Store success samples for result samples + result_samples = samples + + LOG.info(">>>##>>Collect TG KPIs %s %s", datetime.datetime.now(), samples) + self.queue.put(samples, True, overall_constants.QUEUE_PUT_TIMEOUT) - LOG.info(">>>##>> Result Reached PktSize %s Theor_Max_Thruput %s Actual_throughput %s", - pkt_size, theor_max_thruput, actual_max_thruput) - result_samples["Result_pktSize"] = pkt_size - result_samples["Result_theor_max_throughput"] = theor_max_thruput - result_samples["Result_Actual_throughput"] = actual_max_thruput + LOG.info( + ">>>##>> Result Reached PktSize %s Theor_Max_Thruput %s Actual_throughput %s", + pkt_size, theor_max_thruput, result_samples.get("RxThroughput", 0.0)) + result_samples["Status"] = STATUS_RESULT + result_samples["Next_Step"] = "" + result_samples["Actual_throughput"] = result_samples.get("RxThroughput", 0.0) + result_samples["theor_max_throughput"] = theor_max_thruput self.queue.put(result_samples) diff --git a/yardstick/network_services/traffic_profile/prox_profile.py b/yardstick/network_services/traffic_profile/prox_profile.py index 343ef1da2..de4b3f9a0 100644 --- a/yardstick/network_services/traffic_profile/prox_profile.py +++ b/yardstick/network_services/traffic_profile/prox_profile.py @@ -16,6 +16,7 @@ from __future__ import absolute_import import logging +import multiprocessing from yardstick.network_services.traffic_profile.base import TrafficProfile from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxProfileHelper @@ -56,7 +57,7 @@ class ProxProfile(TrafficProfile): def __init__(self, tp_config): super(ProxProfile, self).__init__(tp_config) self.queue = None - self.done = False + self.done = multiprocessing.Event() self.results = [] # TODO: get init values from tp_config @@ -116,7 +117,7 @@ class ProxProfile(TrafficProfile): try: pkt_size = next(self.pkt_size_iterator) except StopIteration: - self.done = True + self.done.set() return # Adjust packet size upwards if it's less than the minimum diff --git a/yardstick/network_services/traffic_profile/rfc2544.py b/yardstick/network_services/traffic_profile/rfc2544.py index b54fc575f..e33c437c9 100644 --- a/yardstick/network_services/traffic_profile/rfc2544.py +++ b/yardstick/network_services/traffic_profile/rfc2544.py @@ -19,6 +19,7 @@ from trex_stl_lib import trex_stl_client from trex_stl_lib import trex_stl_packet_builder_scapy from trex_stl_lib import trex_stl_streams +from yardstick.common import constants from yardstick.network_services.traffic_profile import trex_traffic_profile @@ -118,7 +119,8 @@ class RFC2544Profile(trex_traffic_profile.TrexProfile): ports.append(port_num) port_pg_id.add_port(port_num) profile = self._create_profile(profile_data, - self.rate, port_pg_id) + self.rate, port_pg_id, + self.config.enable_latency) self.generator.client.add_streams(profile, ports=[port_num]) self.generator.client.start(ports=ports, @@ -126,7 +128,7 @@ class RFC2544Profile(trex_traffic_profile.TrexProfile): force=True) return ports, port_pg_id - def _create_profile(self, profile_data, rate, port_pg_id): + def _create_profile(self, profile_data, rate, port_pg_id, enable_latency): """Create a STL profile (list of streams) for a port""" streams = [] for packet_name in profile_data: @@ -134,11 +136,13 @@ class RFC2544Profile(trex_traffic_profile.TrexProfile): get('outer_l2', {}).get('framesize')) imix_data = self._create_imix_data(imix) self._create_vm(profile_data[packet_name]) - _streams = self._create_streams(imix_data, rate, port_pg_id) + _streams = self._create_streams(imix_data, rate, port_pg_id, + enable_latency) streams.extend(_streams) return trex_stl_streams.STLProfile(streams) - def _create_imix_data(self, imix): + def _create_imix_data(self, imix, + weight_mode=constants.DISTRIBUTION_IN_PACKETS): """Generate the IMIX distribution for a STL profile The input information is the framesize dictionary in a test case @@ -157,6 +161,20 @@ class RFC2544Profile(trex_traffic_profile.TrexProfile): E.g.: imix_count = {64: 25, 128: 75} + The weight mode is described in [1]. There are two ways to describe the + weight of the packets: + - Distribution in packets: the weight defines the percentage of + packets sent per packet size. IXIA uses this definition. + - Distribution in bytes: the weight defines the percentage of bytes + sent per packet size. + + Packet size # packets D. in packets Bytes D. in bytes + 40 7 58.33% 280 7% + 576 4 33.33% 2304 56% + 1500 1 8.33% 1500 37% + + [1] https://en.wikipedia.org/wiki/Internet_Mix + :param imix: (dict) IMIX size and weight """ imix_count = {} @@ -171,8 +189,16 @@ class RFC2544Profile(trex_traffic_profile.TrexProfile): imix_sum = 100 weight_normalize = float(imix_sum) / 100 - return {size: float(weight) / weight_normalize - for size, weight in imix_count.items()} + imix_dip = {size: float(weight) / weight_normalize + for size, weight in imix_count.items()} + + if weight_mode == constants.DISTRIBUTION_IN_BYTES: + return imix_dip + + byte_total = sum([int(size) * weight + for size, weight in imix_dip.items()]) + return {size: (int(size) * weight * 100) / byte_total + for size, weight in imix_dip.items()} def _create_vm(self, packet_definition): """Create the STL Raw instructions""" @@ -213,7 +239,7 @@ class RFC2544Profile(trex_traffic_profile.TrexProfile): return trex_stl_packet_builder_scapy.STLPktBuilder( pkt=base_pkt / pad, vm=self.trex_vm) - def _create_streams(self, imix_data, rate, port_pg_id): + def _create_streams(self, imix_data, rate, port_pg_id, enable_latency): """Create a list of streams per packet size The STL TX mode speed of the generated streams will depend on the frame @@ -237,7 +263,8 @@ class RFC2544Profile(trex_traffic_profile.TrexProfile): in imix_data.items() if float(weight) > 0): packet = self._create_single_packet(size) pg_id = port_pg_id.increase_pg_id() - stl_flow = trex_stl_streams.STLFlowLatencyStats(pg_id=pg_id) + stl_flow = (trex_stl_streams.STLFlowLatencyStats(pg_id=pg_id) if + enable_latency else None) mode = trex_stl_streams.STLTXCont(percentage=weight * rate / 100) streams.append(trex_stl_client.STLStream( packet=packet, flow_stats=stl_flow, mode=mode)) @@ -247,19 +274,16 @@ class RFC2544Profile(trex_traffic_profile.TrexProfile): correlated_traffic): """Calculate the drop percentage and run the traffic""" completed = False - tx_rate_fps = 0 - rx_rate_fps = 0 - for sample in samples: - tx_rate_fps += sum( - port['tx_throughput_fps'] for port in sample.values()) - rx_rate_fps += sum( - port['rx_throughput_fps'] for port in sample.values()) - tx_rate_fps = round(float(tx_rate_fps) / len(samples), 2) - rx_rate_fps = round(float(rx_rate_fps) / len(samples), 2) - - # TODO(esm): RFC2544 doesn't tolerate packet loss, why do we? - out_packets = sum(port['out_packets'] for port in samples[-1].values()) - in_packets = sum(port['in_packets'] for port in samples[-1].values()) + out_pkt_end = sum(port['out_packets'] for port in samples[-1].values()) + in_pkt_end = sum(port['in_packets'] for port in samples[-1].values()) + out_pkt_ini = sum(port['out_packets'] for port in samples[0].values()) + in_pkt_ini = sum(port['in_packets'] for port in samples[0].values()) + time_diff = (list(samples[-1].values())[0]['timestamp'] - + list(samples[0].values())[0]['timestamp']).total_seconds() + out_packets = out_pkt_end - out_pkt_ini + in_packets = in_pkt_end - in_pkt_ini + tx_rate_fps = float(out_packets) / time_diff + rx_rate_fps = float(in_packets) / time_diff drop_percent = 100.0 # https://tools.ietf.org/html/rfc2544#section-26.3 diff --git a/yardstick/network_services/utils.py b/yardstick/network_services/utils.py index 4b987fafe..9c64fecde 100644 --- a/yardstick/network_services/utils.py +++ b/yardstick/network_services/utils.py @@ -36,6 +36,9 @@ OPTS = [ cfg.StrOpt('trex_client_lib', default=os.path.join(NSB_ROOT, 'trex_client/stl'), help='trex python library path.'), + cfg.StrOpt('jre_path_i386', + default='', + help='path to installation of 32-bit Java 1.7+.'), ] CONF.register_opts(OPTS, group="nsb") diff --git a/yardstick/network_services/vnf_generic/vnf/agnostic_vnf.py b/yardstick/network_services/vnf_generic/vnf/agnostic_vnf.py new file mode 100644 index 000000000..115fddcf0 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/agnostic_vnf.py @@ -0,0 +1,46 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from yardstick.network_services.vnf_generic.vnf import base + +LOG = logging.getLogger(__name__) + + +class AgnosticVnf(base.GenericVNF): + """ AgnosticVnf implementation. """ + def __init__(self, name, vnfd, task_id): + super(AgnosticVnf, self).__init__(name, vnfd, task_id) + + def instantiate(self, scenario_cfg, context_cfg): + pass + + def wait_for_instantiate(self): + pass + + def terminate(self): + pass + + def scale(self, flavor=""): + pass + + def collect_kpi(self): + pass + + def start_collect(self): + pass + + def stop_collect(self): + pass diff --git a/yardstick/network_services/vnf_generic/vnf/epc_vnf.py b/yardstick/network_services/vnf_generic/vnf/epc_vnf.py new file mode 100644 index 000000000..66d16d07f --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/epc_vnf.py @@ -0,0 +1,53 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from yardstick.network_services.vnf_generic.vnf import base + +LOG = logging.getLogger(__name__) + + +class EPCVnf(base.GenericVNF): + + def __init__(self, name, vnfd, task_id): + super(EPCVnf, self).__init__(name, vnfd, task_id) + + def instantiate(self, scenario_cfg, context_cfg): + """Prepare VNF for operation and start the VNF process/VM + + :param scenario_cfg: Scenario config + :param context_cfg: Context config + """ + pass + + def wait_for_instantiate(self): + """Wait for VNF to start""" + pass + + def terminate(self): + """Kill all VNF processes""" + pass + + def scale(self, flavor=""): + pass + + def collect_kpi(self): + pass + + def start_collect(self): + pass + + def stop_collect(self): + pass diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py index 3241719e8..321c05779 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -984,7 +984,7 @@ class ProxResourceHelper(ClientResourceHelper): def _run_traffic_once(self, traffic_profile): traffic_profile.execute_traffic(self) - if traffic_profile.done: + if traffic_profile.done.is_set(): self._queue.put({'done': True}) LOG.debug("tg_prox done") self._terminated.value = 1 diff --git a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py new file mode 100644 index 000000000..a146b72ca --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py @@ -0,0 +1,1203 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import logging +import requests +import six +import time + +from yardstick.common import exceptions +from yardstick.common import utils as common_utils +from yardstick.common import yaml_loader +from yardstick.network_services import utils as net_serv_utils +from yardstick.network_services.vnf_generic.vnf import sample_vnf + +try: + from lsapi import LsApi +except ImportError: + LsApi = common_utils.ErrorClass + +LOG = logging.getLogger(__name__) + + +class LandslideTrafficGen(sample_vnf.SampleVNFTrafficGen): + APP_NAME = 'LandslideTG' + + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): + if resource_helper_type is None: + resource_helper_type = LandslideResourceHelper + super(LandslideTrafficGen, self).__init__(name, vnfd, task_id, + setup_env_helper_type, + resource_helper_type) + + self.bin_path = net_serv_utils.get_nsb_option('bin_path') + self.name = name + self.runs_traffic = True + self.traffic_finished = False + self.session_profile = None + + def listen_traffic(self, traffic_profile): + pass + + def terminate(self): + self.resource_helper.disconnect() + + def instantiate(self, scenario_cfg, context_cfg): + super(LandslideTrafficGen, self).instantiate(scenario_cfg, context_cfg) + self.resource_helper.connect() + + # Create test servers + test_servers = [x['test_server'] for x in self.vnfd_helper['config']] + self.resource_helper.create_test_servers(test_servers) + + # Create SUTs + [self.resource_helper.create_suts(x['suts']) for x in + self.vnfd_helper['config']] + + # Fill in test session based on session profile and test case options + self._load_session_profile() + + def run_traffic(self, traffic_profile): + self.resource_helper.abort_running_tests() + # Update DMF profile with related test case options + traffic_profile.update_dmf(self.scenario_helper.all_options) + # Create DMF in test user library + self.resource_helper.create_dmf(traffic_profile.dmf_config) + # Create/update test session in test user library + self.resource_helper.create_test_session(self.session_profile) + # Start test session + self.resource_helper.create_running_tests(self.session_profile['name']) + + def collect_kpi(self): + return self.resource_helper.collect_kpi() + + def wait_for_instantiate(self): + pass + + @staticmethod + def _update_session_suts(suts, testcase): + """ Create SUT entry. Update related EPC block in session profile. """ + for sut in suts: + # Update session profile EPC element with SUT info from pod file + tc_role = testcase['parameters'].get(sut['role']) + if tc_role: + _param = {} + if tc_role['class'] == 'Sut': + _param['name'] = sut['name'] + elif tc_role['class'] == 'TestNode': + _param.update({x: sut[x] for x in {'ip', 'phy', 'nextHop'} + if x in sut and sut[x]}) + testcase['parameters'][sut['role']].update(_param) + else: + LOG.info('Unexpected SUT role in pod file: "%s".', sut['role']) + return testcase + + def _update_session_test_servers(self, test_server, _tsgroup_index): + """ Update tsId, reservations, pre-resolved ARP in session profile """ + # Update test server name + test_groups = self.session_profile['tsGroups'] + test_groups[_tsgroup_index]['tsId'] = test_server['name'] + + # Update preResolvedArpAddress + arp_key = 'preResolvedArpAddress' + _preresolved_arp = test_server.get(arp_key) # list of dicts + if _preresolved_arp: + test_groups[_tsgroup_index][arp_key] = _preresolved_arp + + # Update reservations + if 'phySubnets' in test_server: + reservation = {'tsId': test_server['name'], + 'tsIndex': _tsgroup_index, + 'tsName': test_server['name'], + 'phySubnets': test_server['phySubnets']} + if 'reservations' in self.session_profile: + self.session_profile['reservations'].append(reservation) + else: + self.session_profile['reservePorts'] = 'true' + self.session_profile['reservations'] = [reservation] + + @staticmethod + def _update_session_tc_params(tc_options, testcase): + for _param_key in tc_options: + if _param_key == 'AssociatedPhys': + testcase[_param_key] = tc_options[_param_key] + continue + testcase['parameters'][_param_key] = tc_options[_param_key] + return testcase + + def _load_session_profile(self): + + with common_utils.open_relative_file( + self.scenario_helper.scenario_cfg['session_profile'], + self.scenario_helper.task_path) as stream: + self.session_profile = yaml_loader.yaml_load(stream) + + # Raise exception if number of entries differs in following files, + _config_files = ['pod file', 'session_profile file', 'test_case file'] + # Count testcases number in all tsGroups of session profile + session_tests_num = [xx for x in self.session_profile['tsGroups'] + for xx in x['testCases']] + # Create a set containing number of list elements in each structure + _config_files_blocks_num = [ + len(x) for x in + (self.vnfd_helper['config'], # test_servers and suts info + session_tests_num, + self.scenario_helper.all_options['test_cases'])] # test case file + + if len(set(_config_files_blocks_num)) != 1: + raise RuntimeError('Unequal number of elements. {}'.format( + dict(six.moves.zip_longest(_config_files, + _config_files_blocks_num)))) + + ts_names = set() + _tsgroup_idx = -1 + _testcase_idx = 0 + + # Iterate over data structures to overwrite session profile defaults + # _config: single list element holding test servers and SUTs info + # _tc_options: single test case parameters + for _config, tc_options in zip( + self.vnfd_helper['config'], # test servers and SUTS + self.scenario_helper.all_options['test_cases']): # testcase + + _ts_config = _config['test_server'] + + # Calculate test group/test case indexes based on test server name + if _ts_config['name'] in ts_names: + _testcase_idx += 1 + else: + _tsgroup_idx += 1 + _testcase_idx = 0 + + _testcase = \ + self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][ + _testcase_idx] + + if _testcase['type'] != _ts_config['role']: + raise RuntimeError( + 'Test type mismatch in TC#{} of test server {}'.format( + _testcase_idx, _ts_config['name'])) + + # Fill session profile with test servers parameters + if _ts_config['name'] not in ts_names: + self._update_session_test_servers(_ts_config, _tsgroup_idx) + ts_names.add(_ts_config['name']) + + # Fill session profile with suts parameters + self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][ + _testcase_idx].update( + self._update_session_suts(_config['suts'], _testcase)) + + # Update test case parameters + self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][ + _testcase_idx].update( + self._update_session_tc_params(tc_options, _testcase)) + + +class LandslideResourceHelper(sample_vnf.ClientResourceHelper): + """Landslide TG helper class""" + + REST_STATUS_CODES = {'OK': 200, 'CREATED': 201, 'NO CHANGE': 409} + REST_API_CODES = {'NOT MODIFIED': 500810} + + def __init__(self, setup_helper): + super(LandslideResourceHelper, self).__init__(setup_helper) + self._result = {} + self.vnfd_helper = setup_helper.vnfd_helper + self.scenario_helper = setup_helper.scenario_helper + + # TAS Manager config initialization + self._url = None + self._user_id = None + self.session = None + self.license_data = {} + + # TCL session initialization + self._tcl = LandslideTclClient(LsTclHandler(), self) + + self.session = requests.Session() + self.running_tests_uri = 'runningTests' + self.test_session_uri = 'testSessions' + self.test_serv_uri = 'testServers' + self.suts_uri = 'suts' + self.users_uri = 'users' + self.user_lib_uri = None + self.run_id = None + + def abort_running_tests(self, timeout=60, delay=5): + """ Abort running test sessions, if any """ + _start_time = time.time() + while time.time() < _start_time + timeout: + run_tests_states = {x['id']: x['testStateOrStep'] + for x in self.get_running_tests()} + if not set(run_tests_states.values()).difference( + {'COMPLETE', 'COMPLETE_ERROR'}): + break + else: + [self.stop_running_tests(running_test_id=_id, force=True) + for _id, _state in run_tests_states.items() + if 'COMPLETE' not in _state] + time.sleep(delay) + else: + raise RuntimeError( + 'Some test runs not stopped during {} seconds'.format(timeout)) + + def _build_url(self, resource, action=None): + """ Build URL string + + :param resource: REST API resource name + :type resource: str + :param action: actions name and value + :type action: dict('name': <str>, 'value': <str>) + :returns str: REST API resource name with optional action info + """ + # Action is optional and accepted only in presence of resource param + if action and not resource: + raise ValueError("Resource name not provided") + # Concatenate actions + _action = ''.join(['?{}={}'.format(k, v) for k, v in + action.items()]) if action else '' + + return ''.join([self._url, resource, _action]) + + def get_response_params(self, method, resource, params=None): + """ Retrieve params from JSON response of specific resource URL + + :param method: one of supported REST API methods + :type method: str + :param resource: URI, requested resource name + :type resource: str + :param params: attributes to be found in JSON response + :type params: list(str) + """ + _res = [] + params = params if params else [] + response = self.exec_rest_request(method, resource) + # Get substring between last slash sign and question mark (if any) + url_last_part = resource.rsplit('/', 1)[-1].rsplit('?', 1)[0] + _response_json = response.json() + # Expect dict(), if URL last part and top dict key don't match + # Else, if they match, expect list() + k, v = list(_response_json.items())[0] + if k != url_last_part: + v = [v] # v: list(dict(str: str)) + # Extract params, or whole list of dicts (without top level key) + for x in v: + _res.append({param: x[param] for param in params} if params else x) + return _res + + def _create_user(self, auth, level=1): + """ Create new user + + :param auth: data to create user account on REST server + :type auth: dict + :param level: Landslide user permissions level + :type level: int + :returns int: user id + """ + # Set expiration date in two years since account creation date + _exp_date = time.strftime( + '{}/%m/%d %H:%M %Z'.format(time.gmtime().tm_year + 2)) + _username = auth['user'] + _fields = {"contactInformation": "", "expiresOn": _exp_date, + "fullName": "Test User", + "isActive": "true", "level": level, + "password": auth['password'], + "username": _username} + _response = self.exec_rest_request('post', self.users_uri, + json_data=_fields, raise_exc=False) + _resp_json = _response.json() + if _response.status_code == self.REST_STATUS_CODES['CREATED']: + # New user created + _id = _resp_json['id'] + LOG.info("New user created: username='%s', id='%s'", _username, + _id) + elif _resp_json.get('apiCode') == self.REST_API_CODES['NOT MODIFIED']: + # User already exists + LOG.info("Account '%s' already exists.", _username) + # Get user id + _id = self._modify_user(_username, {"isActive": "true"})['id'] + else: + raise exceptions.RestApiError( + 'Error during new user "{}" creation'.format(_username)) + return _id + + def _modify_user(self, username, fields): + """ Modify information about existing user + + :param username: user name of account to be modified + :type username: str + :param fields: data to modify user account on REST server + :type fields: dict + :returns dict: user info + """ + _response = self.exec_rest_request('post', self.users_uri, + action={'username': username}, + json_data=fields, raise_exc=False) + if _response.status_code == self.REST_STATUS_CODES['OK']: + _response = _response.json() + else: + raise exceptions.RestApiError( + 'Error during user "{}" data update: {}'.format( + username, + _response.status_code)) + LOG.info("User account '%s' modified: '%s'", username, _response) + return _response + + def _delete_user(self, username): + """ Delete user account + + :param username: username field + :type username: str + :returns bool: True if succeeded + """ + self.exec_rest_request('delete', self.users_uri, + action={'username': username}) + + def _get_users(self, username=None): + """ Get user records from REST server + + :param username: username field + :type username: None|str + :returns list(dict): empty list, or user record, or list of all users + """ + _response = self.get_response_params('get', self.users_uri) + _res = [u for u in _response if + u['username'] == username] if username else _response + return _res + + def exec_rest_request(self, method, resource, action=None, json_data=None, + logs=True, raise_exc=True): + """ Execute REST API request, return response object + + :param method: one of supported requests ('post', 'get', 'delete') + :type method: str + :param resource: URL of resource + :type resource: str + :param action: data used to provide URI located after question mark + :type action: dict + :param json_data: mandatory only for 'post' method + :type json_data: dict + :param logs: debug logs display flag + :type raise_exc: bool + :param raise_exc: if True, raise exception on REST API call error + :returns requests.Response(): REST API call response object + """ + json_data = json_data if json_data else {} + action = action if action else {} + _method = method.upper() + method = method.lower() + if method not in ('post', 'get', 'delete'): + raise ValueError("Method '{}' not supported".format(_method)) + + if method == 'post' and not action: + if not (json_data and isinstance(json_data, collections.Mapping)): + raise ValueError( + 'JSON data missing in {} request'.format(_method)) + + r = getattr(self.session, method)(self._build_url(resource, action), + json=json_data) + if raise_exc and not r.ok: + msg = 'Failed to "{}" resource "{}". Reason: "{}"'.format( + method, self._build_url(resource, action), r.reason) + raise exceptions.RestApiError(msg) + + if logs: + LOG.debug("RC: %s | Request: %s | URL: %s", r.status_code, method, + r.request.url) + LOG.debug("Response: %s", r.json()) + return r + + def connect(self): + """Connect to RESTful server using test user account""" + tas_info = self.vnfd_helper['mgmt-interface'] + # Supported REST Server ports: HTTP - 8080, HTTPS - 8181 + _port = '8080' if tas_info['proto'] == 'http' else '8181' + tas_info.update({'port': _port}) + self._url = '{proto}://{ip}:{port}/api/'.format(**tas_info) + self.session.headers.update({'Accept': 'application/json', + 'Content-type': 'application/json'}) + # Login with super user to create test user + self.session.auth = ( + tas_info['super-user'], tas_info['super-user-password']) + LOG.info("Connect using superuser: server='%s'", self._url) + auth = {x: tas_info[x] for x in ('user', 'password')} + self._user_id = self._create_user(auth) + # Login with test user + self.session.auth = auth['user'], auth['password'] + # Test user validity + self.exec_rest_request('get', '') + + self.user_lib_uri = 'libraries/{{}}/{}'.format(self.test_session_uri) + LOG.info("Login with test user: server='%s'", self._url) + # Read existing license + self.license_data['lic_id'] = tas_info['license'] + + # Tcl client init + self._tcl.connect(tas_info['ip'], *self.session.auth) + + return self.session + + def disconnect(self): + self.session = None + self._tcl.disconnect() + + def terminate(self): + self._terminated.value = 1 + + def create_dmf(self, dmf): + if isinstance(dmf, list): + for _dmf in dmf: + self._tcl.create_dmf(_dmf) + else: + self._tcl.create_dmf(dmf) + + def delete_dmf(self, dmf): + if isinstance(dmf, list): + for _dmf in dmf: + self._tcl.delete_dmf(_dmf) + else: + self._tcl.delete_dmf(dmf) + + def create_suts(self, suts): + # Keep only supported keys in suts object + for _sut in suts: + sut_entry = {k: v for k, v in _sut.items() + if k not in {'phy', 'nextHop', 'role'}} + _response = self.exec_rest_request( + 'post', self.suts_uri, json_data=sut_entry, + logs=False, raise_exc=False) + if _response.status_code != self.REST_STATUS_CODES['CREATED']: + LOG.info(_response.reason) # Failed to create + _name = sut_entry.pop('name') + # Modify existing SUT + self.configure_sut(sut_name=_name, json_data=sut_entry) + else: + LOG.info("SUT created: %s", sut_entry) + + def get_suts(self, suts_id=None): + if suts_id: + _suts = self.exec_rest_request( + 'get', '{}/{}'.format(self.suts_uri, suts_id)).json() + else: + _suts = self.get_response_params('get', self.suts_uri) + + return _suts + + def configure_sut(self, sut_name, json_data): + """ Modify information of specific SUTs + + :param sut_name: name of existing SUT + :type sut_name: str + :param json_data: SUT settings + :type json_data: dict() + """ + LOG.info("Modifying SUT information...") + _response = self.exec_rest_request('post', + self.suts_uri, + action={'name': sut_name}, + json_data=json_data, + raise_exc=False) + if _response.status_code not in {self.REST_STATUS_CODES[x] for x in + {'OK', 'NO CHANGE'}}: + raise exceptions.RestApiError(_response.reason) + + LOG.info("Modified SUT: %s", sut_name) + + def delete_suts(self, suts_ids=None): + if not suts_ids: + _curr_suts = self.get_response_params('get', self.suts_uri) + suts_ids = [x['id'] for x in _curr_suts] + LOG.info("Deleting SUTs with following IDs: %s", suts_ids) + for _id in suts_ids: + self.exec_rest_request('delete', + '{}/{}'.format(self.suts_uri, _id)) + LOG.info("\tDone for SUT id: %s", _id) + + def _check_test_servers_state(self, test_servers_ids=None, delay=10, + timeout=300): + LOG.info("Waiting for related test servers state change to READY...") + # Wait on state change + _start_time = time.time() + while time.time() - _start_time < timeout: + ts_ids_not_ready = {x['id'] for x in + self.get_test_servers(test_servers_ids) + if x['state'] != 'READY'} + if ts_ids_not_ready == set(): + break + time.sleep(delay) + else: + raise RuntimeError( + 'Test servers not in READY state after {} seconds.'.format( + timeout)) + + def create_test_servers(self, test_servers): + """ Create test servers + + :param test_servers: input data for test servers creation + mandatory fields: managementIp + optional fields: name + :type test_servers: list(dict) + """ + _ts_ids = [] + for _ts in test_servers: + _msg = 'Created test server "%(name)s"' + _ts_ids.append(self._tcl.create_test_server(_ts)) + if _ts.get('thread_model'): + _msg += ' in mode: "%(thread_model)s"' + LOG.info(_msg, _ts) + + self._check_test_servers_state(_ts_ids) + + def get_test_servers(self, test_server_ids=None): + if not test_server_ids: # Get all test servers info + _test_servers = self.exec_rest_request( + 'get', self.test_serv_uri).json()[self.test_serv_uri] + LOG.info("Current test servers configuration: %s", _test_servers) + return _test_servers + + _test_servers = [] + for _id in test_server_ids: + _test_servers.append(self.exec_rest_request( + 'get', '{}/{}'.format(self.test_serv_uri, _id)).json()) + LOG.info("Current test servers configuration: %s", _test_servers) + return _test_servers + + def configure_test_servers(self, action, json_data=None, + test_server_ids=None): + if not test_server_ids: + test_server_ids = [x['id'] for x in self.get_test_servers()] + elif isinstance(test_server_ids, int): + test_server_ids = [test_server_ids] + for _id in test_server_ids: + self.exec_rest_request('post', + '{}/{}'.format(self.test_serv_uri, _id), + action=action, json_data=json_data) + LOG.info("Test server (id: %s) configuration done: %s", _id, + action) + return test_server_ids + + def delete_test_servers(self, test_servers_ids=None): + # Delete test servers + for _ts in self.get_test_servers(test_servers_ids): + self.exec_rest_request('delete', '{}/{}'.format(self.test_serv_uri, + _ts['id'])) + LOG.info("Deleted test server: %s", _ts['name']) + + def create_test_session(self, test_session): + # Use tcl client to create session + test_session['library'] = self._user_id + LOG.debug("Creating session='%s'", test_session['name']) + self._tcl.create_test_session(test_session) + + def get_test_session(self, test_session_name=None): + if test_session_name: + uri = 'libraries/{}/{}/{}'.format(self._user_id, + self.test_session_uri, + test_session_name) + else: + uri = self.user_lib_uri.format(self._user_id) + _test_sessions = self.exec_rest_request('get', uri).json() + return _test_sessions + + def configure_test_session(self, template_name, test_session): + # Override specified test session parameters + LOG.info('Update test session parameters: %s', test_session['name']) + test_session.update({'library': self._user_id}) + return self.exec_rest_request( + method='post', + action={'action': 'overrideAndSaveAs'}, + json_data=test_session, + resource='{}/{}'.format(self.user_lib_uri.format(self._user_id), + template_name)) + + def delete_test_session(self, test_session): + return self.exec_rest_request('delete', '{}/{}'.format( + self.user_lib_uri.format(self._user_id), test_session)) + + def create_running_tests(self, test_session_name): + r = self.exec_rest_request('post', + self.running_tests_uri, + json_data={'library': self._user_id, + 'name': test_session_name}) + if r.status_code != self.REST_STATUS_CODES['CREATED']: + raise exceptions.RestApiError('Failed to start test session.') + self.run_id = r.json()['id'] + + def get_running_tests(self, running_test_id=None): + """Get JSON structure of specified running test entity + + :param running_test_id: ID of created running test entity + :type running_test_id: int + :returns list: running tests entity + """ + if not running_test_id: + running_test_id = '' + _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id) + _res = self.exec_rest_request('get', _res_name, logs=False).json() + # If no run_id specified, skip top level key in response dict. + # Else return JSON as list + return _res.get('runningTests', [_res]) + + def delete_running_tests(self, running_test_id=None): + if not running_test_id: + running_test_id = '' + _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id) + self.get_response_params('delete', _res_name) + LOG.info("Deleted running test with id: %s", running_test_id) + + def _running_tests_action(self, running_test_id, action, json_data=None): + if not json_data: + json_data = {} + # Supported actions: + # 'stop', 'abort', 'continue', 'update', 'sendTcCommand', 'sendOdc' + _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id) + self.exec_rest_request('post', _res_name, {'action': action}, + json_data) + LOG.debug("Executed action: '%s' on running test id: %s", action, + running_test_id) + + def stop_running_tests(self, running_test_id, json_data=None, force=False): + _action = 'abort' if force else 'stop' + self._running_tests_action(running_test_id, _action, + json_data=json_data) + LOG.info('Performed action: "%s" to test run with id: %s', _action, + running_test_id) + + def check_running_test_state(self, run_id): + r = self.exec_rest_request('get', + '{}/{}'.format(self.running_tests_uri, + run_id)) + return r.json().get("testStateOrStep") + + def get_running_tests_results(self, run_id): + _res = self.exec_rest_request( + 'get', + '{}/{}/{}'.format(self.running_tests_uri, + run_id, + 'measurements')).json() + return _res + + def _write_results(self, results): + # Avoid None value at test session start + _elapsed_time = results['elapsedTime'] if results['elapsedTime'] else 0 + + _res_tabs = results.get('tabs') + # Avoid parsing 'tab' dict key initially (missing or empty) + if not _res_tabs: + return + + # Flatten nested dict holding Landslide KPIs of current test run + flat_kpis_dict = {} + for _tab, _kpis in six.iteritems(_res_tabs): + for _kpi, _value in six.iteritems(_kpis): + # Combine table name and KPI name using delimiter "::" + _key = '::'.join([_tab, _kpi]) + try: + # Cast value from str to float + # Remove comma and/or measure units, e.g. "us" + flat_kpis_dict[_key] = float( + _value.split(' ')[0].replace(',', '')) + except ValueError: # E.g. if KPI represents datetime + pass + LOG.info("Polling test results of test run id: %s. Elapsed time: %s " + "seconds", self.run_id, _elapsed_time) + return flat_kpis_dict + + def collect_kpi(self): + if 'COMPLETE' in self.check_running_test_state(self.run_id): + self._result.update({'done': True}) + return self._result + _res = self.get_running_tests_results(self.run_id) + _kpis = self._write_results(_res) + if _kpis: + _kpis.update({'run_id': int(self.run_id)}) + _kpis.update({'iteration': _res['iteration']}) + self._result.update(_kpis) + return self._result + + +class LandslideTclClient(object): + """Landslide TG TCL client class""" + + DEFAULT_TEST_NODE = { + 'ethStatsEnabled': True, + 'forcedEthInterface': '', + 'innerVlanId': 0, + 'ip': '', + 'mac': '', + 'mtu': 1500, + 'nextHop': '', + 'numLinksOrNodes': 1, + 'numVlan': 1, + 'phy': '', + 'uniqueVlanAddr': False, + 'vlanDynamic': 0, + 'vlanId': 0, + 'vlanUserPriority': 0, + 'vlanTagType': 0 + } + + TEST_NODE_CMD = \ + 'ls::create -TestNode-{} -under $p_ -Type "eth"' \ + ' -Phy "{phy}" -Ip "{ip}" -NumLinksOrNodes {numLinksOrNodes}' \ + ' -NextHop "{nextHop}" -Mac "{mac}" -MTU {mtu}' \ + ' -ForcedEthInterface "{forcedEthInterface}"' \ + ' -EthStatsEnabled {ethStatsEnabled}' \ + ' -VlanId {vlanId} -VlanUserPriority {vlanUserPriority}' \ + ' -NumVlan {numVlan} -UniqueVlanAddr {uniqueVlanAddr}' \ + ';' + + def __init__(self, tcl_handler, ts_context): + self.tcl_server_ip = None + self._user = None + self._library_id = None + self._basic_library_id = None + self._tcl = tcl_handler + self._ts_context = ts_context + self.ts_ids = set() + + # Test types names expected in session profile, test case and pod files + self._tc_types = {"SGW_Nodal", "SGW_Node", "MME_Nodal", "PGW_Node", + "PCRF_Node"} + + self._class_param_config_handler = { + "Array": self._configure_array_param, + "TestNode": self._configure_test_node_param, + "Sut": self._configure_sut_param, + "Dmf": self._configure_dmf_param + } + + def connect(self, tcl_server_ip, username, password): + """ Connect to TCL server with username and password + + :param tcl_server_ip: TCL server IP address + :type tcl_server_ip: str + :param username: existing username on TCL server + :type username: str + :param password: password related to username on TCL server + :type password: str + """ + LOG.info("connect: server='%s' user='%s'", tcl_server_ip, username) + res = self._tcl.execute( + "ls::login {} {} {}".format(tcl_server_ip, username, password)) + if 'java0x' not in res: # handle assignment reflects login success + raise exceptions.LandslideTclException( + "connect: login failed ='{}'.".format(res)) + self._library_id = self._tcl.execute( + "ls::get [ls::query LibraryInfo -userLibraryName {}] -Id".format( + username)) + self._basic_library_id = self._get_library_id('Basic') + self.tcl_server_ip = tcl_server_ip + self._user = username + LOG.debug("connect: user='%s' me='%s' basic='%s'", self._user, + self._library_id, + self._basic_library_id) + + def disconnect(self): + """ Disconnect from TCL server. Drop TCL connection configuration """ + LOG.info("disconnect: server='%s' user='%s'", + self.tcl_server_ip, self._user) + self._tcl.execute("ls::logout") + self.tcl_server_ip = None + self._user = None + self._library_id = None + self._basic_library_id = None + + def _add_test_server(self, name, ip): + try: + # Check if test server exists with name equal to _ts_name + ts_id = int(self.resolve_test_server_name(name)) + except ValueError: + # Such test server does not exist. Attempt to create it + ts_id = self._tcl.execute( + 'ls::perform AddTs -Name "{}" -Ip "{}"'.format(name, ip)) + try: + int(ts_id) + except ValueError: + # Failed to create test server, e.g. limit reached + raise RuntimeError( + 'Failed to create test server: "{}". {}'.format(name, + ts_id)) + return ts_id + + def _update_license(self, name): + """ Setup/update test server license + + :param name: test server name + :type name: str + """ + # Retrieve current TsInfo configuration, result stored in handle "ts" + self._tcl.execute( + 'set ts [ls::retrieve TsInfo -Name "{}"]'.format(name)) + + # Set license ID, if it differs from current one, update test server + _curr_lic_id = self._tcl.execute('ls::get $ts -RequestedLicense') + if _curr_lic_id != self._ts_context.license_data['lic_id']: + self._tcl.execute('ls::config $ts -RequestedLicense {}'.format( + self._ts_context.license_data['lic_id'])) + self._tcl.execute('ls::perform ModifyTs $ts') + + def _set_thread_model(self, name, thread_model): + # Retrieve test server configuration, store it in handle "tsc" + _cfguser_password = self._ts_context.vnfd_helper['mgmt-interface'][ + 'cfguser_password'] + self._tcl.execute( + 'set tsc [ls::perform RetrieveTsConfiguration ' + '-name "{}" {}]'.format(name, _cfguser_password)) + # Configure ThreadModel, if it differs from current one + thread_model_map = {'Legacy': 'V0', + 'Max': 'V1', + 'Fireball': 'V1_FB3'} + _model = thread_model_map[thread_model] + _curr_model = self._tcl.execute('ls::get $tsc -ThreadModel') + if _curr_model != _model: + self._tcl.execute( + 'ls::config $tsc -ThreadModel "{}"'.format(_model)) + self._tcl.execute( + 'ls::perform ApplyTsConfiguration $tsc {}'.format( + _cfguser_password)) + + def create_test_server(self, test_server): + _ts_thread_model = test_server.get('thread_model') + _ts_name = test_server['name'] + + ts_id = self._add_test_server(_ts_name, test_server['ip']) + + self._update_license(_ts_name) + + # Skip below code modifying thread_model if it is not defined + if _ts_thread_model: + self._set_thread_model(_ts_name, _ts_thread_model) + + return ts_id + + def create_test_session(self, test_session): + """ Create, configure and save Landslide test session object. + + :param test_session: Landslide TestSession object + :type test_session: dict + """ + LOG.info("create_test_session: name='%s'", test_session['name']) + self._tcl.execute('set test_ [ls::create TestSession]') + self._tcl.execute('ls::config $test_ -Library {} -Name "{}"'.format( + self._library_id, test_session['name'])) + self._tcl.execute('ls::config $test_ -Description "{}"'.format( + test_session['description'])) + if 'keywords' in test_session: + self._tcl.execute('ls::config $test_ -Keywords "{}"'.format( + test_session['keywords'])) + if 'duration' in test_session: + self._tcl.execute('ls::config $test_ -Duration "{}"'.format( + test_session['duration'])) + if 'iterations' in test_session: + self._tcl.execute('ls::config $test_ -Iterations "{}"'.format( + test_session['iterations'])) + if 'reservePorts' in test_session: + if test_session['reservePorts'] == 'true': + self._tcl.execute('ls::config $test_ -Reserve Ports') + + if 'reservations' in test_session: + for _reservation in test_session['reservations']: + self._configure_reservation(_reservation) + + if 'reportOptions' in test_session: + self._configure_report_options(test_session['reportOptions']) + + for _index, _group in enumerate(test_session['tsGroups']): + self._configure_ts_group(_group, _index) + + self._save_test_session() + + def create_dmf(self, dmf): + """ Create, configure and save Landslide Data Message Flow object. + + :param dmf: Landslide Data Message Flow object + :type: dmf: dict + """ + self._tcl.execute('set dmf_ [ls::create Dmf]') + _lib_id = self._get_library_id(dmf['dmf']['library']) + self._tcl.execute('ls::config $dmf_ -Library {} -Name "{}"'.format( + _lib_id, + dmf['dmf']['name'])) + for _param_key in dmf: + if _param_key == 'dmf': + continue + _param_value = dmf[_param_key] + if isinstance(_param_value, dict): + # Configure complex parameter + _tcl_cmd = 'ls::config $dmf_' + for _sub_param_key in _param_value: + _sub_param_value = _param_value[_sub_param_key] + if isinstance(_sub_param_value, str): + _tcl_cmd += ' -{} "{}"'.format(_sub_param_key, + _sub_param_value) + else: + _tcl_cmd += ' -{} {}'.format(_sub_param_key, + _sub_param_value) + + self._tcl.execute(_tcl_cmd) + else: + # Configure simple parameter + if isinstance(_param_value, str): + self._tcl.execute( + 'ls::config $dmf_ -{} "{}"'.format(_param_key, + _param_value)) + else: + self._tcl.execute( + 'ls::config $dmf_ -{} {}'.format(_param_key, + _param_value)) + self._save_dmf() + + def configure_dmf(self, dmf): + # Use create to reconfigure and overwrite existing dmf + self.create_dmf(dmf) + + def delete_dmf(self, dmf): + raise NotImplementedError + + def _save_dmf(self): + # Call 'Validate' to set default values for missing parameters + res = self._tcl.execute('ls::perform Validate -Dmf $dmf_') + if res == 'Invalid': + res = self._tcl.execute('ls::get $dmf_ -ErrorsAndWarnings') + LOG.error("_save_dmf: %s", res) + raise exceptions.LandslideTclException("_save_dmf: {}".format(res)) + else: + res = self._tcl.execute('ls::save $dmf_ -overwrite') + LOG.debug("_save_dmf: result (%s)", res) + + def _configure_report_options(self, options): + for _option_key in options: + _option_value = options[_option_key] + if _option_key == 'format': + _format = 0 + if _option_value == 'CSV': + _format = 1 + self._tcl.execute( + 'ls::config $test_.ReportOptions -Format {} ' + '-Ts -3 -Tc -3'.format(_format)) + else: + self._tcl.execute( + 'ls::config $test_.ReportOptions -{} {}'.format( + _option_key, + _option_value)) + + def _configure_ts_group(self, ts_group, ts_group_index): + try: + _ts_id = int(self.resolve_test_server_name(ts_group['tsId'])) + except ValueError: + raise RuntimeError('Test server name "{}" does not exist.'.format( + ts_group['tsId'])) + if _ts_id not in self.ts_ids: + self._tcl.execute( + 'set tss_ [ls::create TsGroup -under $test_ -tsId {} ]'.format( + _ts_id)) + self.ts_ids.add(_ts_id) + for _case in ts_group.get('testCases', []): + self._configure_tc_type(_case, ts_group_index) + + self._configure_preresolved_arp(ts_group.get('preResolvedArpAddress')) + + def _configure_tc_type(self, tc, ts_group_index): + if tc['type'] not in self._tc_types: + raise RuntimeError('Test type {} not supported.'.format( + tc['type'])) + tc['type'] = tc['type'].replace('_', ' ') + res = self._tcl.execute( + 'set tc_ [ls::retrieve testcase -libraryId {0} "{1}"]'.format( + self._basic_library_id, tc['type'])) + if 'Invalid' in res: + raise RuntimeError('Test type {} not found in "Basic" ' + 'library.'.format(tc['type'])) + self._tcl.execute( + 'ls::config $test_.TsGroup({}) -children-Tc $tc_'.format( + ts_group_index)) + self._tcl.execute('ls::config $tc_ -Library {0} -Name "{1}"'.format( + self._basic_library_id, tc['name'])) + self._tcl.execute( + 'ls::config $tc_ -Description "{}"'.format(tc['type'])) + self._tcl.execute( + 'ls::config $tc_ -Keywords "GTP LTE {}"'.format(tc['type'])) + if 'linked' in tc: + self._tcl.execute( + 'ls::config $tc_ -Linked {}'.format(tc['linked'])) + if 'AssociatedPhys' in tc: + self._tcl.execute('ls::config $tc_ -AssociatedPhys "{}"'.format( + tc['AssociatedPhys'])) + if 'parameters' in tc: + self._configure_parameters(tc['parameters']) + + def _configure_parameters(self, params): + self._tcl.execute('set p_ [ls::get $tc_ -children-Parameters(0)]') + for _param_key in sorted(params): + _param_value = params[_param_key] + if isinstance(_param_value, dict): + # Configure complex parameter + if _param_value['class'] in self._class_param_config_handler: + self._class_param_config_handler[_param_value['class']]( + _param_key, + _param_value) + else: + # Configure simple parameter + self._tcl.execute( + 'ls::create {} -under $p_ -Value "{}"'.format( + _param_key, + _param_value)) + + def _configure_array_param(self, name, params): + self._tcl.execute('ls::create -Array-{} -under $p_ ;'.format(name)) + for param in params['array']: + self._tcl.execute( + 'ls::create ArrayItem -under $p_.{} -Value "{}"'.format(name, + param)) + + def _configure_test_node_param(self, name, params): + _params = self.DEFAULT_TEST_NODE + _params.update(params) + + # TCL command expects lower case 'true' or 'false' + _params['ethStatsEnabled'] = str(_params['ethStatsEnabled']).lower() + _params['uniqueVlanAddr'] = str(_params['uniqueVlanAddr']).lower() + + cmd = self.TEST_NODE_CMD.format(name, **_params) + self._tcl.execute(cmd) + + def _configure_sut_param(self, name, params): + self._tcl.execute( + 'ls::create -Sut-{} -under $p_ -Name "{}";'.format(name, + params['name'])) + + def _configure_dmf_param(self, name, params): + self._tcl.execute('ls::create -Dmf-{} -under $p_ ;'.format(name)) + + for _flow_index, _flow in enumerate(params['mainflows']): + _lib_id = self._get_library_id(_flow['library']) + self._tcl.execute( + 'ls::perform AddDmfMainflow $p_.Dmf {} "{}"'.format( + _lib_id, + _flow['name'])) + + if not params.get('instanceGroups'): + return + + _instance_group = params['instanceGroups'][_flow_index] + + # Traffic Mixer parameters handling + for _key in ['mixType', 'rate']: + if _key in _instance_group: + self._tcl.execute( + 'ls::config $p_.Dmf.InstanceGroup({}) -{} {}'.format( + _flow_index, _key, _instance_group[_key])) + + # Assignments parameters handling + for _row_id, _row in enumerate(_instance_group.get('rows', [])): + self._tcl.execute( + 'ls::config $p_.Dmf.InstanceGroup({}).Row({}) -Node {} ' + '-OverridePort {} -ClientPort {} -Context {} -Role {} ' + '-PreferredTransport {} -RatingGroup {} ' + '-ServiceID {}'.format( + _flow_index, _row_id, _row['node'], + _row['overridePort'], _row['clientPort'], + _row['context'], _row['role'], _row['transport'], + _row['ratingGroup'], _row['serviceId'])) + + def _configure_reservation(self, reservation): + _ts_id = self.resolve_test_server_name(reservation['tsId']) + self._tcl.execute( + 'set reservation_ [ls::create Reservation -under $test_]') + self._tcl.execute( + 'ls::config $reservation_ -TsIndex {} -TsId {} ' + '-TsName "{}"'.format(reservation['tsIndex'], + _ts_id, + reservation['tsName'])) + for _subnet in reservation['phySubnets']: + self._tcl.execute( + 'set physubnet_ [ls::create PhySubnet -under $reservation_]') + self._tcl.execute( + 'ls::config $physubnet_ -Name "{}" -Base "{}" -Mask "{}" ' + '-NumIps {}'.format(_subnet['name'], _subnet['base'], + _subnet['mask'], _subnet['numIps'])) + + def _configure_preresolved_arp(self, pre_resolved_arp): + if not pre_resolved_arp: # Pre-resolved ARP configuration not found + return + for _entry in pre_resolved_arp: + # TsGroup handle name should correspond in _configure_ts_group() + self._tcl.execute( + 'ls::create PreResolvedArpAddress -under $tss_ ' + '-StartingAddress "{StartingAddress}" ' + '-NumNodes {NumNodes}'.format(**_entry)) + + def delete_test_session(self, test_session): + raise NotImplementedError + + def _save_test_session(self): + # Call 'Validate' to set default values for missing parameters + res = self._tcl.execute('ls::perform Validate -TestSession $test_') + if res == 'Invalid': + res = self._tcl.execute('ls::get $test_ -ErrorsAndWarnings') + raise exceptions.LandslideTclException( + "Test session validation failed. Server response: {}".format( + res)) + else: + self._tcl.execute('ls::save $test_ -overwrite') + LOG.debug("Test session saved successfully.") + + def _get_library_id(self, library): + _library_id = self._tcl.execute( + "ls::get [ls::query LibraryInfo -systemLibraryName {}] -Id".format( + library)) + try: + int(_library_id) + return _library_id + except ValueError: + pass + + _library_id = self._tcl.execute( + "ls::get [ls::query LibraryInfo -userLibraryName {}] -Id".format( + library)) + try: + int(_library_id) + except ValueError: + LOG.error("_get_library_id: library='%s' not found.", library) + raise exceptions.LandslideTclException( + "_get_library_id: library='{}' not found.".format( + library)) + + return _library_id + + def resolve_test_server_name(self, ts_name): + return self._tcl.execute("ls::query TsId {}".format(ts_name)) + + +class LsTclHandler(object): + """Landslide TCL Handler class""" + + LS_OK = "ls_ok" + JRE_PATH = net_serv_utils.get_nsb_option('jre_path_i386') + + def __init__(self): + self.tcl_cmds = {} + self._ls = LsApi(jre_path=self.JRE_PATH) + self._ls.tcl( + "ls::config ApiOptions -NoReturnSuccessResponseString '{}'".format( + self.LS_OK)) + + def execute(self, command): + res = self._ls.tcl(command) + self.tcl_cmds[command] = res + return res diff --git a/yardstick/network_services/vnf_generic/vnf/tg_prox.py b/yardstick/network_services/vnf_generic/vnf/tg_prox.py index 854319a21..d12c42ec8 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_prox.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_prox.py @@ -12,9 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import absolute_import - import logging +import copy from yardstick.network_services.utils import get_nsb_option from yardstick.network_services.vnf_generic.vnf.prox_vnf import ProxApproxVnf @@ -32,7 +31,9 @@ class ProxTrafficGen(SampleVNFTrafficGen): def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, resource_helper_type=None): - # don't call superclass, use custom wrapper of ProxApproxVnf + vnfd_cpy = copy.deepcopy(vnfd) + super(ProxTrafficGen, self).__init__(name, vnfd_cpy, task_id) + self._vnf_wrapper = ProxApproxVnf( name, vnfd, task_id, setup_env_helper_type, resource_helper_type) self.bin_path = get_nsb_option('bin_path', '') diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py index 4d3bc2ce5..558a62935 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py @@ -60,7 +60,7 @@ class IxiaResourceHelper(ClientResourceHelper): def stop_collect(self): self._terminated.value = 1 - def generate_samples(self, ports, key=None): + def generate_samples(self, ports, duration): stats = self.get_stats() samples = {} @@ -70,27 +70,23 @@ class IxiaResourceHelper(ClientResourceHelper): try: # reverse lookup port name from port_num so the stats dict is descriptive intf = self.vnfd_helper.find_interface_by_port(port_num) - port_name = intf["name"] + port_name = intf['name'] + avg_latency = stats['Store-Forward_Avg_latency_ns'][port_num] + min_latency = stats['Store-Forward_Min_latency_ns'][port_num] + max_latency = stats['Store-Forward_Max_latency_ns'][port_num] samples[port_name] = { - "rx_throughput_kps": float(stats["Rx_Rate_Kbps"][port_num]), - "tx_throughput_kps": float(stats["Tx_Rate_Kbps"][port_num]), - "rx_throughput_mbps": float(stats["Rx_Rate_Mbps"][port_num]), - "tx_throughput_mbps": float(stats["Tx_Rate_Mbps"][port_num]), - "in_packets": int(stats["Valid_Frames_Rx"][port_num]), - "out_packets": int(stats["Frames_Tx"][port_num]), - # NOTE(ralonsoh): we need to make the traffic injection - # time variable. - "RxThroughput": int(stats["Valid_Frames_Rx"][port_num]) / 30, - "TxThroughput": int(stats["Frames_Tx"][port_num]) / 30, + 'rx_throughput_kps': float(stats['Rx_Rate_Kbps'][port_num]), + 'tx_throughput_kps': float(stats['Tx_Rate_Kbps'][port_num]), + 'rx_throughput_mbps': float(stats['Rx_Rate_Mbps'][port_num]), + 'tx_throughput_mbps': float(stats['Tx_Rate_Mbps'][port_num]), + 'in_packets': int(stats['Valid_Frames_Rx'][port_num]), + 'out_packets': int(stats['Frames_Tx'][port_num]), + 'RxThroughput': float(stats['Valid_Frames_Rx'][port_num]) / duration, + 'TxThroughput': float(stats['Frames_Tx'][port_num]) / duration, + 'Store-Forward_Avg_latency_ns': utils.safe_cast(avg_latency, int, 0), + 'Store-Forward_Min_latency_ns': utils.safe_cast(min_latency, int, 0), + 'Store-Forward_Max_latency_ns': utils.safe_cast(max_latency, int, 0) } - if key: - avg_latency = stats["Store-Forward_Avg_latency_ns"][port_num] - min_latency = stats["Store-Forward_Min_latency_ns"][port_num] - max_latency = stats["Store-Forward_Max_latency_ns"][port_num] - samples[port_name][key] = \ - {"Store-Forward_Avg_latency_ns": avg_latency, - "Store-Forward_Min_latency_ns": min_latency, - "Store-Forward_Max_latency_ns": max_latency} except IndexError: pass @@ -100,7 +96,10 @@ class IxiaResourceHelper(ClientResourceHelper): """Initialize the IXIA IxNetwork client and configure the server""" self.client.clear_config() self.client.assign_ports() - self.client.create_traffic_model() + vports = self.client.get_vports() + uplink_vports = vports[::2] + downlink_vports = vports[1::2] + self.client.create_traffic_model(uplink_vports, downlink_vports) def run_traffic(self, traffic_profile, *args): if self._terminated.value: @@ -129,13 +128,11 @@ class IxiaResourceHelper(ClientResourceHelper): self, self.client, mac) self.client_started.value = 1 # pylint: disable=unnecessary-lambda - utils.wait_until_true(lambda: self.client.is_traffic_stopped()) - samples = self.generate_samples(traffic_profile.ports) + utils.wait_until_true(lambda: self.client.is_traffic_stopped(), + timeout=traffic_profile.config.duration * 2) + samples = self.generate_samples(traffic_profile.ports, + traffic_profile.config.duration) - # NOTE(ralonsoh): the traffic injection duration is fixed to 30 - # seconds. This parameter is configurable and must be retrieved - # from the traffic_profile.full_profile information. - # Every flow must have the same duration. completed, samples = traffic_profile.get_drop_percentage( samples, min_tol, max_tol, first_run=first_run) self._queue.put(samples) diff --git a/yardstick/network_services/vnf_generic/vnf/tg_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_trex.py index 58b73488b..4296da84c 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_trex.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_trex.py @@ -11,8 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -""" Trex acts as traffic generation and vnf definitions based on IETS Spec """ +import datetime import logging import os @@ -167,6 +167,7 @@ class TrexResourceHelper(ClientResourceHelper): def _get_samples(self, ports, port_pg_id=None): stats = self.get_stats(ports) + timestamp = datetime.datetime.now() samples = {} for pname in (intf['name'] for intf in self.vnfd_helper.interfaces): port_num = self.vnfd_helper.port_num(pname) @@ -178,6 +179,7 @@ class TrexResourceHelper(ClientResourceHelper): 'tx_throughput_bps': float(port_stats.get('tx_bps', 0.0)), 'in_packets': int(port_stats.get('ipackets', 0)), 'out_packets': int(port_stats.get('opackets', 0)), + 'timestamp': timestamp } pg_id_list = port_pg_id.get_pg_ids(port_num) |