diff options
Diffstat (limited to 'yardstick/network_services')
45 files changed, 4605 insertions, 411 deletions
diff --git a/yardstick/network_services/helpers/cpu.py b/yardstick/network_services/helpers/cpu.py index 8c21754ff..279af204a 100644 --- a/yardstick/network_services/helpers/cpu.py +++ b/yardstick/network_services/helpers/cpu.py @@ -15,11 +15,15 @@ import io +# Number of threads per core. +NR_OF_THREADS = 2 + class CpuSysCores(object): def __init__(self, connection=""): self.core_map = {} + self.cpuinfo = {} self.connection = connection def _open_cpuinfo(self): @@ -33,11 +37,11 @@ class CpuSysCores(object): core_lines = {} for line in lines: if line.strip(): - name, value = line.split(":", 1) - core_lines[name.strip()] = value.strip() + name, value = line.split(":", 1) + core_lines[name.strip()] = value.strip() else: - core_details.append(core_lines) - core_lines = {} + core_details.append(core_lines) + core_lines = {} return core_details @@ -51,7 +55,7 @@ class CpuSysCores(object): lines = self._open_cpuinfo() core_details = self._get_core_details(lines) for core in core_details: - for k, v in core.items(): + for k, _ in core.items(): if k == "physical id": if core["physical id"] not in self.core_map: self.core_map[core['physical id']] = [] @@ -60,6 +64,17 @@ class CpuSysCores(object): return self.core_map + def get_cpu_layout(self): + _, stdout, _ = self.connection.execute("lscpu -p") + self.cpuinfo = {} + self.cpuinfo['cpuinfo'] = list() + for line in stdout.split("\n"): + if line and line[0] != "#": + self.cpuinfo['cpuinfo'].append( + [CpuSysCores._str2int(x) for x in + line.split(",")]) + return self.cpuinfo + def validate_cpu_cfg(self, vnf_cfg=None): if vnf_cfg is None: vnf_cfg = { @@ -78,3 +93,81 @@ class CpuSysCores(object): return -1 return 0 + + def is_smt_enabled(self): + return CpuSysCores.smt_enabled(self.cpuinfo) + + def cpu_list_per_node(self, cpu_node, smt_used=False): + cpu_node = int(cpu_node) + cpu_info = self.cpuinfo.get("cpuinfo") + if cpu_info is None: + raise RuntimeError("Node cpuinfo not available.") + + smt_enabled = self.is_smt_enabled() + if not smt_enabled and smt_used: + raise RuntimeError("SMT is not enabled.") + + cpu_list = [] + for cpu in cpu_info: + if cpu[3] == cpu_node: + cpu_list.append(cpu[0]) + + if not smt_enabled or smt_enabled and smt_used: + pass + + if smt_enabled and not smt_used: + cpu_list_len = len(cpu_list) + cpu_list = cpu_list[:int(cpu_list_len / NR_OF_THREADS)] + + return cpu_list + + def cpu_slice_of_list_per_node(self, cpu_node, skip_cnt=0, cpu_cnt=0, + smt_used=False): + cpu_list = self.cpu_list_per_node(cpu_node, smt_used) + + cpu_list_len = len(cpu_list) + if cpu_cnt + skip_cnt > cpu_list_len: + raise RuntimeError("cpu_cnt + skip_cnt > length(cpu list).") + + if cpu_cnt == 0: + cpu_cnt = cpu_list_len - skip_cnt + + if smt_used: + cpu_list_0 = cpu_list[:int(cpu_list_len / NR_OF_THREADS)] + cpu_list_1 = cpu_list[int(cpu_list_len / NR_OF_THREADS):] + cpu_list = [cpu for cpu in cpu_list_0[skip_cnt:skip_cnt + cpu_cnt]] + cpu_list_ex = [cpu for cpu in + cpu_list_1[skip_cnt:skip_cnt + cpu_cnt]] + cpu_list.extend(cpu_list_ex) + else: + cpu_list = [cpu for cpu in cpu_list[skip_cnt:skip_cnt + cpu_cnt]] + + return cpu_list + + def cpu_list_per_node_str(self, cpu_node, skip_cnt=0, cpu_cnt=0, sep=",", + smt_used=False): + cpu_list = self.cpu_slice_of_list_per_node(cpu_node, + skip_cnt=skip_cnt, + cpu_cnt=cpu_cnt, + smt_used=smt_used) + return sep.join(str(cpu) for cpu in cpu_list) + + @staticmethod + def _str2int(string): + try: + return int(string) + except ValueError: + return 0 + + @staticmethod + def smt_enabled(cpuinfo): + cpu_info = cpuinfo.get("cpuinfo") + if cpu_info is None: + raise RuntimeError("Node cpuinfo not available.") + cpu_mems = [item[-4:] for item in cpu_info] + cpu_mems_len = int(len(cpu_mems) / NR_OF_THREADS) + count = 0 + for cpu_mem in cpu_mems[:cpu_mems_len]: + if cpu_mem in cpu_mems[cpu_mems_len:]: + count += 1 + return count == cpu_mems_len diff --git a/yardstick/network_services/helpers/dpdkbindnic_helper.py b/yardstick/network_services/helpers/dpdkbindnic_helper.py index 1c74355ef..33a5e8c1d 100644 --- a/yardstick/network_services/helpers/dpdkbindnic_helper.py +++ b/yardstick/network_services/helpers/dpdkbindnic_helper.py @@ -13,7 +13,6 @@ # limitations under the License. import logging import os - import re from collections import defaultdict from itertools import chain @@ -21,7 +20,6 @@ from itertools import chain from yardstick.common import exceptions from yardstick.common.utils import validate_non_string_sequence - NETWORK_KERNEL = 'network_kernel' NETWORK_DPDK = 'network_dpdk' NETWORK_OTHER = 'network_other' @@ -284,15 +282,22 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \ res = self.ssh_helper.execute(*args, **kwargs) if res[0] != 0: template = '{} command failed with rc={}' + LOG.critical("DPDK_DEVBIND Failure %s", res[1]) raise DpdkBindHelperException(template.format(self.dpdk_devbind, res[0])) return res - def load_dpdk_driver(self): + def load_dpdk_driver(self, dpdk_driver=None): + if dpdk_driver is None: + dpdk_driver = self.dpdk_driver cmd_template = "sudo modprobe {} && sudo modprobe {}" - self.ssh_helper.execute(cmd_template.format(self.UIO_DRIVER, self.dpdk_driver)) - - def check_dpdk_driver(self): - return self.ssh_helper.execute("lsmod | grep -i {}".format(self.dpdk_driver))[0] + self.ssh_helper.execute( + cmd_template.format(self.UIO_DRIVER, dpdk_driver)) + + def check_dpdk_driver(self, dpdk_driver=None): + if dpdk_driver is None: + dpdk_driver = self.dpdk_driver + return \ + self.ssh_helper.execute("lsmod | grep -i {}".format(dpdk_driver))[0] @property def _status_cmd(self): diff --git a/yardstick/network_services/helpers/vpp_helpers/__init__.py b/yardstick/network_services/helpers/vpp_helpers/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/yardstick/network_services/helpers/vpp_helpers/__init__.py diff --git a/yardstick/network_services/helpers/vpp_helpers/abstract_search_algorithm.py b/yardstick/network_services/helpers/vpp_helpers/abstract_search_algorithm.py new file mode 100644 index 000000000..fced05833 --- /dev/null +++ b/yardstick/network_services/helpers/vpp_helpers/abstract_search_algorithm.py @@ -0,0 +1,53 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This is a modified copy of +# https://gerrit.fd.io/r/gitweb?p=csit.git;a=blob_plain;f=resources/libraries/python/MLRsearch/AbstractSearchAlgorithm.py;hb=HEAD + + +from abc import ABCMeta, abstractmethod + + +class AbstractSearchAlgorithm(object): + """Abstract class defining common API for search algorithms.""" + + __metaclass__ = ABCMeta + + def __init__(self, measurer): + """Store the rate provider. + + :param measurer: Object able to perform trial or composite measurements. + :type measurer: AbstractMeasurer.AbstractMeasurer + """ + # TODO: Type check for AbstractMeasurer? + self.measurer = measurer + + @abstractmethod + def narrow_down_ndr_and_pdr( + self, fail_rate, line_rate, packet_loss_ratio): + """Perform measurements to narrow down intervals, return them. + + This will be renamed when custom loss ratio lists are supported. + + :param fail_rate: Minimal target transmit rate [pps]. + :param line_rate: Maximal target transmit rate [pps]. + :param packet_loss_ratio: Fraction of packets lost, for PDR [1]. + :type fail_rate: float + :type line_rate: float + :type packet_loss_ratio: float + :returns: Structure containing narrowed down intervals + and their measurements. + :rtype: NdrPdrResult.NdrPdrResult + """ + # TODO: Do we agree on arguments related to precision or trial duration? diff --git a/yardstick/network_services/helpers/vpp_helpers/multiple_loss_ratio_search.py b/yardstick/network_services/helpers/vpp_helpers/multiple_loss_ratio_search.py new file mode 100644 index 000000000..582e3dc27 --- /dev/null +++ b/yardstick/network_services/helpers/vpp_helpers/multiple_loss_ratio_search.py @@ -0,0 +1,688 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This is a modified copy of +# https://gerrit.fd.io/r/gitweb?p=csit.git;a=blob_plain;f=resources/libraries/python/MLRsearch/MultipleLossRatioSearch.py;hb=HEAD + +import datetime +import logging +import math +import time + +from yardstick.network_services.helpers.vpp_helpers.abstract_search_algorithm import \ + AbstractSearchAlgorithm +from yardstick.network_services.helpers.vpp_helpers.ndr_pdr_result import \ + NdrPdrResult +from yardstick.network_services.helpers.vpp_helpers.receive_rate_interval import \ + ReceiveRateInterval +from yardstick.network_services.helpers.vpp_helpers.receive_rate_measurement import \ + ReceiveRateMeasurement + +LOGGING = logging.getLogger(__name__) + + +class MultipleLossRatioSearch(AbstractSearchAlgorithm): + """Optimized binary search algorithm for finding NDR and PDR bounds. + + Traditional binary search algorithm needs initial interval + (lower and upper bound), and returns final interval after bisecting + (until some exit condition is met). + The exit condition is usually related to the interval width, + (upper bound value minus lower bound value). + + The optimized algorithm contains several improvements + aimed to reduce overall search time. + + One improvement is searching for two intervals at once. + The intervals are for NDR (No Drop Rate) and PDR (Partial Drop Rate). + + Next improvement is that the initial interval does not need to be valid. + Imagine initial interval (10, 11) where 11 is smaller + than the searched value. + The algorithm will try (11, 13) interval next, and if 13 is still smaller, + (13, 17) and so on, doubling width until the upper bound is valid. + The part when interval expands is called external search, + the part when interval is bisected is called internal search. + + Next improvement is that trial measurements at small trial duration + can be used to find a reasonable interval for full trial duration search. + This results in more trials performed, but smaller overall duration + in general. + + Next improvement is bisecting in logarithmic quantities, + so that exit criteria can be independent of measurement units. + + Next improvement is basing the initial interval on receive rates. + + Final improvement is exiting early if the minimal value + is not a valid lower bound. + + The complete search consist of several phases, + each phase performing several trial measurements. + Initial phase creates initial interval based on receive rates + at maximum rate and at maximum receive rate (MRR). + Final phase and preceding intermediate phases are performing + external and internal search steps, + each resulting interval is the starting point for the next phase. + The resulting interval of final phase is the result of the whole algorithm. + + Each non-initial phase uses its own trial duration and width goal. + Any non-initial phase stops searching (for NDR or PDR independently) + when minimum is not a valid lower bound (at current duration), + or all of the following is true: + Both bounds are valid, bound bounds are measured at the current phase + trial duration, interval width is less than the width goal + for current phase. + + TODO: Review and update this docstring according to rst docs. + TODO: Support configurable number of Packet Loss Ratios. + """ + + class ProgressState(object): + """Structure containing data to be passed around in recursion.""" + + def __init__( + self, result, phases, duration, width_goal, packet_loss_ratio, + minimum_transmit_rate, maximum_transmit_rate): + """Convert and store the argument values. + + :param result: Current measured NDR and PDR intervals. + :param phases: How many intermediate phases to perform + before the current one. + :param duration: Trial duration to use in the current phase [s]. + :param width_goal: The goal relative width for the curreent phase. + :param packet_loss_ratio: PDR fraction for the current search. + :param minimum_transmit_rate: Minimum target transmit rate + for the current search [pps]. + :param maximum_transmit_rate: Maximum target transmit rate + for the current search [pps]. + :type result: NdrPdrResult.NdrPdrResult + :type phases: int + :type duration: float + :type width_goal: float + :type packet_loss_ratio: float + :type minimum_transmit_rate: float + :type maximum_transmit_rate: float + """ + self.result = result + self.phases = int(phases) + self.duration = float(duration) + self.width_goal = float(width_goal) + self.packet_loss_ratio = float(packet_loss_ratio) + self.minimum_transmit_rate = float(minimum_transmit_rate) + self.maximum_transmit_rate = float(maximum_transmit_rate) + + def __init__(self, measurer, latency=False, pkt_size=64, + final_relative_width=0.005, + final_trial_duration=30.0, initial_trial_duration=1.0, + number_of_intermediate_phases=2, timeout=600.0, doublings=1): + """Store the measurer object and additional arguments. + + :param measurer: Rate provider to use by this search object. + :param final_relative_width: Final lower bound transmit rate + cannot be more distant that this multiple of upper bound [1]. + :param final_trial_duration: Trial duration for the final phase [s]. + :param initial_trial_duration: Trial duration for the initial phase + and also for the first intermediate phase [s]. + :param number_of_intermediate_phases: Number of intermediate phases + to perform before the final phase [1]. + :param timeout: The search will fail itself when not finished + before this overall time [s]. + :param doublings: How many doublings to do in external search step. + Default 1 is suitable for fairly stable tests, + less stable tests might get better overal duration with 2 or more. + :type measurer: AbstractMeasurer.AbstractMeasurer + :type final_relative_width: float + :type final_trial_duration: float + :type initial_trial_duration: int + :type number_of_intermediate_phases: int + :type timeout: float + :type doublings: int + """ + super(MultipleLossRatioSearch, self).__init__(measurer) + self.latency = latency + self.pkt_size = int(pkt_size) + self.final_trial_duration = float(final_trial_duration) + self.final_relative_width = float(final_relative_width) + self.number_of_intermediate_phases = int(number_of_intermediate_phases) + self.initial_trial_duration = float(initial_trial_duration) + self.timeout = float(timeout) + self.doublings = int(doublings) + + self.queue = None + self.port_pg_id = None + self.ports = [] + self.test_data = {} + self.profiles = {} + + @staticmethod + def double_relative_width(relative_width): + """Return relative width corresponding to double logarithmic width. + + :param relative_width: The base relative width to double. + :type relative_width: float + :returns: The relative width of double logarithmic size. + :rtype: float + """ + return 1.999 * relative_width - relative_width * relative_width + # The number should be 2.0, but we want to avoid rounding errors, + # and ensure half of double is not larger than the original value. + + @staticmethod + def double_step_down(relative_width, current_bound): + """Return rate of double logarithmic width below. + + :param relative_width: The base relative width to double. + :param current_bound: The current target transmit rate to move [pps]. + :type relative_width: float + :type current_bound: float + :returns: Transmit rate smaller by logarithmically double width [pps]. + :rtype: float + """ + return current_bound * ( + 1.0 - MultipleLossRatioSearch.double_relative_width( + relative_width)) + + @staticmethod + def expand_down(relative_width, doublings, current_bound): + """Return rate of expanded logarithmic width below. + + :param relative_width: The base relative width to double. + :param doublings: How many doublings to do for expansion. + :param current_bound: The current target transmit rate to move [pps]. + :type relative_width: float + :type doublings: int + :type current_bound: float + :returns: Transmit rate smaller by logarithmically double width [pps]. + :rtype: float + """ + for _ in range(doublings): + relative_width = MultipleLossRatioSearch.double_relative_width( + relative_width) + return current_bound * (1.0 - relative_width) + + @staticmethod + def double_step_up(relative_width, current_bound): + """Return rate of double logarithmic width above. + + :param relative_width: The base relative width to double. + :param current_bound: The current target transmit rate to move [pps]. + :type relative_width: float + :type current_bound: float + :returns: Transmit rate larger by logarithmically double width [pps]. + :rtype: float + """ + return current_bound / ( + 1.0 - MultipleLossRatioSearch.double_relative_width( + relative_width)) + + @staticmethod + def expand_up(relative_width, doublings, current_bound): + """Return rate of expanded logarithmic width above. + + :param relative_width: The base relative width to double. + :param doublings: How many doublings to do for expansion. + :param current_bound: The current target transmit rate to move [pps]. + :type relative_width: float + :type doublings: int + :type current_bound: float + :returns: Transmit rate smaller by logarithmically double width [pps]. + :rtype: float + """ + for _ in range(doublings): + relative_width = MultipleLossRatioSearch.double_relative_width( + relative_width) + return current_bound / (1.0 - relative_width) + + @staticmethod + def half_relative_width(relative_width): + """Return relative width corresponding to half logarithmic width. + + :param relative_width: The base relative width to halve. + :type relative_width: float + :returns: The relative width of half logarithmic size. + :rtype: float + """ + return 1.0 - math.sqrt(1.0 - relative_width) + + @staticmethod + def half_step_up(relative_width, current_bound): + """Return rate of half logarithmic width above. + + :param relative_width: The base relative width to halve. + :param current_bound: The current target transmit rate to move [pps]. + :type relative_width: float + :type current_bound: float + :returns: Transmit rate larger by logarithmically half width [pps]. + :rtype: float + """ + return current_bound / ( + 1.0 - MultipleLossRatioSearch.half_relative_width( + relative_width)) + + def init_generator(self, ports, port_pg_id, profiles, test_data, queue): + self.ports = ports + self.port_pg_id = port_pg_id + self.profiles = profiles + self.test_data = test_data + self.queue = queue + self.queue.cancel_join_thread() + + def collect_kpi(self, stats, test_value): + samples = self.measurer.generate_samples(stats, self.ports, + self.port_pg_id, self.latency) + samples.update(self.test_data) + LOGGING.info("Collect TG KPIs %s %s %s", datetime.datetime.now(), + test_value, samples) + self.queue.put(samples) + + def narrow_down_ndr_and_pdr( + self, minimum_transmit_rate, maximum_transmit_rate, + packet_loss_ratio): + """Perform initial phase, create state object, proceed with next phases. + + :param minimum_transmit_rate: Minimal target transmit rate [pps]. + :param maximum_transmit_rate: Maximal target transmit rate [pps]. + :param packet_loss_ratio: Fraction of packets lost, for PDR [1]. + :type minimum_transmit_rate: float + :type maximum_transmit_rate: float + :type packet_loss_ratio: float + :returns: Structure containing narrowed down intervals + and their measurements. + :rtype: NdrPdrResult.NdrPdrResult + :raises RuntimeError: If total duration is larger than timeout. + """ + minimum_transmit_rate = float(minimum_transmit_rate) + maximum_transmit_rate = float(maximum_transmit_rate) + packet_loss_ratio = float(packet_loss_ratio) + line_measurement = self.measure( + self.initial_trial_duration, maximum_transmit_rate, self.latency) + initial_width_goal = self.final_relative_width + for _ in range(self.number_of_intermediate_phases): + initial_width_goal = self.double_relative_width(initial_width_goal) + max_lo = maximum_transmit_rate * (1.0 - initial_width_goal) + mrr = max( + minimum_transmit_rate, + min(max_lo, line_measurement.receive_rate)) + mrr_measurement = self.measure( + self.initial_trial_duration, mrr, self.latency) + # Attempt to get narrower width. + if mrr_measurement.loss_fraction > 0.0: + max2_lo = mrr * (1.0 - initial_width_goal) + mrr2 = min(max2_lo, mrr_measurement.receive_rate) + else: + mrr2 = mrr / (1.0 - initial_width_goal) + if mrr2 > minimum_transmit_rate and mrr2 < maximum_transmit_rate: + line_measurement = mrr_measurement + mrr_measurement = self.measure( + self.initial_trial_duration, mrr2, self.latency) + if mrr2 > mrr: + buf = line_measurement + line_measurement = mrr_measurement + mrr_measurement = buf + starting_interval = ReceiveRateInterval( + mrr_measurement, line_measurement) + starting_result = NdrPdrResult(starting_interval, starting_interval) + state = self.ProgressState( + starting_result, self.number_of_intermediate_phases, + self.final_trial_duration, self.final_relative_width, + packet_loss_ratio, minimum_transmit_rate, maximum_transmit_rate) + state = self.ndrpdr(state) + result = state.result + # theor_max_thruput = 0 + result_samples = {} + + MultipleLossRatioSearch.display_single_bound(result_samples, + 'NDR_LOWER', result.ndr_interval.measured_low.transmit_rate, + self.pkt_size, result.ndr_interval.measured_low.latency) + MultipleLossRatioSearch.display_single_bound(result_samples, + 'NDR_UPPER', result.ndr_interval.measured_high.transmit_rate, + self.pkt_size) + MultipleLossRatioSearch.display_single_bound(result_samples, + 'PDR_LOWER', result.pdr_interval.measured_low.transmit_rate, + self.pkt_size, result.pdr_interval.measured_low.latency) + MultipleLossRatioSearch.display_single_bound(result_samples, + 'PDR_UPPER', result.pdr_interval.measured_high.transmit_rate, + self.pkt_size) + pdr_msg = self.check_ndrpdr_interval_validity(result_samples, "PDR", + result.pdr_interval, + packet_loss_ratio) + ndr_msg = self.check_ndrpdr_interval_validity(result_samples, "NDR", + result.ndr_interval) + self.queue.put(result_samples) + + LOGGING.debug("result_samples: %s", result_samples) + LOGGING.info(pdr_msg) + LOGGING.info(ndr_msg) + + self.perform_additional_measurements_based_on_ndrpdr_result(result) + + return result_samples + + def _measure_and_update_state(self, state, transmit_rate): + """Perform trial measurement, update bounds, return new state. + + :param state: State before this measurement. + :param transmit_rate: Target transmit rate for this measurement [pps]. + :type state: ProgressState + :type transmit_rate: float + :returns: State after the measurement. + :rtype: ProgressState + """ + # TODO: Implement https://stackoverflow.com/a/24683360 + # to avoid the string manipulation if log verbosity is too low. + LOGGING.info("result before update: %s", state.result) + LOGGING.debug( + "relative widths in goals: %s", state.result.width_in_goals( + self.final_relative_width)) + measurement = self.measure(state.duration, transmit_rate, self.latency) + ndr_interval = self._new_interval( + state.result.ndr_interval, measurement, 0.0) + pdr_interval = self._new_interval( + state.result.pdr_interval, measurement, state.packet_loss_ratio) + state.result = NdrPdrResult(ndr_interval, pdr_interval) + return state + + @staticmethod + def _new_interval(old_interval, measurement, packet_loss_ratio): + """Return new interval with bounds updated according to the measurement. + + :param old_interval: The current interval before the measurement. + :param measurement: The new meaqsurement to take into account. + :param packet_loss_ratio: Fraction for PDR (or zero for NDR). + :type old_interval: ReceiveRateInterval.ReceiveRateInterval + :type measurement: ReceiveRateMeasurement.ReceiveRateMeasurement + :type packet_loss_ratio: float + :returns: The updated interval. + :rtype: ReceiveRateInterval.ReceiveRateInterval + """ + old_lo, old_hi = old_interval.measured_low, old_interval.measured_high + # Priority zero: direct replace if the target Tr is the same. + if measurement.target_tr in (old_lo.target_tr, old_hi.target_tr): + if measurement.target_tr == old_lo.target_tr: + return ReceiveRateInterval(measurement, old_hi) + else: + return ReceiveRateInterval(old_lo, measurement) + # Priority one: invalid lower bound allows only one type of update. + if old_lo.loss_fraction > packet_loss_ratio: + # We can only expand down, old bound becomes valid upper one. + if measurement.target_tr < old_lo.target_tr: + return ReceiveRateInterval(measurement, old_lo) + else: + return old_interval + # Lower bound is now valid. + # Next priorities depend on target Tr. + if measurement.target_tr < old_lo.target_tr: + # Lower external measurement, relevant only + # if the new measurement has high loss rate. + if measurement.loss_fraction > packet_loss_ratio: + # Returning the broader interval as old_lo + # would be invalid upper bound. + return ReceiveRateInterval(measurement, old_hi) + elif measurement.target_tr > old_hi.target_tr: + # Upper external measurement, only relevant for invalid upper bound. + if old_hi.loss_fraction <= packet_loss_ratio: + # Old upper bound becomes valid new lower bound. + return ReceiveRateInterval(old_hi, measurement) + else: + # Internal measurement, replaced boundary + # depends on measured loss fraction. + if measurement.loss_fraction > packet_loss_ratio: + # We have found a narrow valid interval, + # regardless of whether old upper bound was valid. + return ReceiveRateInterval(old_lo, measurement) + else: + # In ideal world, we would not want to shrink interval + # if upper bound is not valid. + # In the real world, we want to shrink it for + # "invalid upper bound at maximal rate" case. + return ReceiveRateInterval(measurement, old_hi) + # Fallback, the interval is unchanged by the measurement. + return old_interval + + def ndrpdr(self, state): + """Pefrom trials for this phase. Return the new state when done. + + :param state: State before this phase. + :type state: ProgressState + :returns: The updated state. + :rtype: ProgressState + :raises RuntimeError: If total duration is larger than timeout. + """ + start_time = time.time() + if state.phases > 0: + # We need to finish preceding intermediate phases first. + saved_phases = state.phases + state.phases -= 1 + # Preceding phases have shorter duration. + saved_duration = state.duration + duration_multiplier = state.duration / self.initial_trial_duration + phase_exponent = float(state.phases) / saved_phases + state.duration = self.initial_trial_duration * math.pow( + duration_multiplier, phase_exponent) + # Shorter durations do not need that narrow widths. + saved_width = state.width_goal + state.width_goal = self.double_relative_width(state.width_goal) + # Recurse. + state = self.ndrpdr(state) + # Restore the state for current phase. + state.duration = saved_duration + state.width_goal = saved_width + state.phases = saved_phases # Not needed, but just in case. + LOGGING.info( + "starting iterations with duration %s and relative width goal %s", + state.duration, state.width_goal) + while 1: + if time.time() > start_time + self.timeout: + raise RuntimeError("Optimized search takes too long.") + # Order of priorities: invalid bounds (nl, pl, nh, ph), + # then narrowing relative Tr widths. + # Durations are not priorities yet, + # they will settle on their own hopefully. + ndr_lo = state.result.ndr_interval.measured_low + ndr_hi = state.result.ndr_interval.measured_high + pdr_lo = state.result.pdr_interval.measured_low + pdr_hi = state.result.pdr_interval.measured_high + ndr_rel_width = max( + state.width_goal, state.result.ndr_interval.rel_tr_width) + pdr_rel_width = max( + state.width_goal, state.result.pdr_interval.rel_tr_width) + # If we are hitting maximal or minimal rate, we cannot shift, + # but we can re-measure. + if ndr_lo.loss_fraction > 0.0: + if ndr_lo.target_tr > state.minimum_transmit_rate: + new_tr = max( + state.minimum_transmit_rate, + self.expand_down( + ndr_rel_width, self.doublings, ndr_lo.target_tr)) + LOGGING.info("ndr lo external %s", new_tr) + state = self._measure_and_update_state(state, new_tr) + continue + elif ndr_lo.duration < state.duration: + LOGGING.info("ndr lo minimal re-measure") + state = self._measure_and_update_state( + state, state.minimum_transmit_rate) + continue + if pdr_lo.loss_fraction > state.packet_loss_ratio: + if pdr_lo.target_tr > state.minimum_transmit_rate: + new_tr = max( + state.minimum_transmit_rate, + self.expand_down( + pdr_rel_width, self.doublings, pdr_lo.target_tr)) + LOGGING.info("pdr lo external %s", new_tr) + state = self._measure_and_update_state(state, new_tr) + continue + elif pdr_lo.duration < state.duration: + LOGGING.info("pdr lo minimal re-measure") + state = self._measure_and_update_state( + state, state.minimum_transmit_rate) + continue + if ndr_hi.loss_fraction <= 0.0: + if ndr_hi.target_tr < state.maximum_transmit_rate: + new_tr = min( + state.maximum_transmit_rate, + self.expand_up( + ndr_rel_width, self.doublings, ndr_hi.target_tr)) + LOGGING.info("ndr hi external %s", new_tr) + state = self._measure_and_update_state(state, new_tr) + continue + elif ndr_hi.duration < state.duration: + LOGGING.info("ndr hi maximal re-measure") + state = self._measure_and_update_state( + state, state.maximum_transmit_rate) + continue + if pdr_hi.loss_fraction <= state.packet_loss_ratio: + if pdr_hi.target_tr < state.maximum_transmit_rate: + new_tr = min( + state.maximum_transmit_rate, + self.expand_up( + pdr_rel_width, self.doublings, pdr_hi.target_tr)) + LOGGING.info("pdr hi external %s", new_tr) + state = self._measure_and_update_state(state, new_tr) + continue + elif pdr_hi.duration < state.duration: + LOGGING.info("ndr hi maximal re-measure") + state = self._measure_and_update_state( + state, state.maximum_transmit_rate) + continue + # If we are hitting maximum_transmit_rate, + # it is still worth narrowing width, + # hoping large enough loss fraction will happen. + # But if we are hitting the minimal rate (at current duration), + # no additional measurement will help with that, + # so we can stop narrowing in this phase. + if (ndr_lo.target_tr <= state.minimum_transmit_rate + and ndr_lo.loss_fraction > 0.0): + ndr_rel_width = 0.0 + if (pdr_lo.target_tr <= state.minimum_transmit_rate + and pdr_lo.loss_fraction > state.packet_loss_ratio): + pdr_rel_width = 0.0 + if ndr_rel_width > state.width_goal: + # We have to narrow NDR width first, as NDR internal search + # can invalidate PDR (but not vice versa). + new_tr = self.half_step_up(ndr_rel_width, ndr_lo.target_tr) + LOGGING.info("Bisecting for NDR at %s", new_tr) + state = self._measure_and_update_state(state, new_tr) + continue + if pdr_rel_width > state.width_goal: + # PDR iternal search. + new_tr = self.half_step_up(pdr_rel_width, pdr_lo.target_tr) + LOGGING.info("Bisecting for PDR at %s", new_tr) + state = self._measure_and_update_state(state, new_tr) + continue + # We do not need to improve width, but there still might be + # some measurements with smaller duration. + # We need to re-measure with full duration, possibly + # creating invalid bounds to resolve (thus broadening width). + if ndr_lo.duration < state.duration: + LOGGING.info("re-measuring NDR lower bound") + state = self._measure_and_update_state(state, ndr_lo.target_tr) + continue + if pdr_lo.duration < state.duration: + LOGGING.info("re-measuring PDR lower bound") + state = self._measure_and_update_state(state, pdr_lo.target_tr) + continue + # Except when lower bounds have high loss fraction, in that case + # we do not need to re-measure _upper_ bounds. + if ndr_hi.duration < state.duration and ndr_rel_width > 0.0: + LOGGING.info("re-measuring NDR upper bound") + state = self._measure_and_update_state(state, ndr_hi.target_tr) + continue + if pdr_hi.duration < state.duration and pdr_rel_width > 0.0: + LOGGING.info("re-measuring PDR upper bound") + state = self._measure_and_update_state(state, pdr_hi.target_tr) + continue + # Widths are narrow (or lower bound minimal), bound measurements + # are long enough, we can return. + LOGGING.info("phase done") + break + return state + + def measure(self, duration, transmit_rate, latency): + duration = float(duration) + transmit_rate = float(transmit_rate) + # Trex needs target Tr per stream, but reports aggregate Tx and Dx. + unit_rate = str(transmit_rate / 2.0) + "pps" + stats = self.measurer.send_traffic_on_tg(self.ports, self.port_pg_id, + duration, unit_rate, + latency=latency) + self.measurer.client.reset(ports=self.ports) + self.measurer.client.clear_stats(ports=self.ports) + self.measurer.client.remove_all_streams(ports=self.ports) + for port, profile in self.profiles.items(): + self.measurer.client.add_streams(profile, ports=[port]) + self.collect_kpi(stats, unit_rate) + transmit_count = int(self.measurer.sent) + loss_count = int(self.measurer.loss) + measurement = ReceiveRateMeasurement( + duration, transmit_rate, transmit_count, loss_count) + measurement.latency = self.measurer.latency + return measurement + + def perform_additional_measurements_based_on_ndrpdr_result(self, result): + duration = 5.0 + rate = "{}{}".format(result.ndr_interval.measured_low.target_tr / 2.0, + 'pps') + for _ in range(0, 1): + stats = self.measurer.send_traffic_on_tg(self.ports, + self.port_pg_id, duration, + rate) + self.collect_kpi(stats, rate) + LOGGING.info('Traffic loss occurred: %s', self.measurer.loss) + + @staticmethod + def display_single_bound(result_samples, result_type, rate_total, pkt_size, + latency=None): + bandwidth_total = float(rate_total) * (pkt_size + 20) * 8 / (10 ** 9) + + result_samples["Result_{}".format(result_type)] = { + "rate_total_pps": float(rate_total), + "bandwidth_total_Gbps": float(bandwidth_total), + } + + if latency: + for item in latency: + if latency.index(item) == 0: + name = "Result_{}_{}".format("stream0", result_type) + else: + name = "Result_{}_{}".format("stream1", result_type) + lat_min, lat_avg, lat_max = item.split('/') + result_samples[name] = { + "min_latency": float(lat_min), + "avg_latency": float(lat_avg), + "max_latency": float(lat_max), + } + + @staticmethod + def check_ndrpdr_interval_validity(result_samples, result_type, interval, + packet_loss_ratio=0.0): + lower_bound = interval.measured_low + lower_bound_lf = lower_bound.loss_fraction + + result_samples["Result_{}_packets_lost".format(result_type)] = { + "packet_loss_ratio": float(lower_bound_lf), + "packets_lost": float(lower_bound.loss_count), + } + + if lower_bound_lf <= packet_loss_ratio: + return "Minimal rate loss fraction {} reach target {}".format( + lower_bound_lf, packet_loss_ratio) + else: + message = "Minimal rate loss fraction {} does not reach target {}".format( + lower_bound_lf, packet_loss_ratio) + if lower_bound_lf >= 1.0: + return '{}\nZero packets forwarded!'.format(message) + else: + return '{}\n{} packets lost.'.format(message, + lower_bound.loss_count) diff --git a/yardstick/network_services/helpers/vpp_helpers/ndr_pdr_result.py b/yardstick/network_services/helpers/vpp_helpers/ndr_pdr_result.py new file mode 100644 index 000000000..34a97f9fb --- /dev/null +++ b/yardstick/network_services/helpers/vpp_helpers/ndr_pdr_result.py @@ -0,0 +1,68 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This is a modified copy of +# https://gerrit.fd.io/r/gitweb?p=csit.git;a=blob_plain;f=resources/libraries/python/MLRsearch/NdrPdrResult.py;hb=HEAD + +from yardstick.network_services.helpers.vpp_helpers.receive_rate_interval import \ + ReceiveRateInterval + + +class NdrPdrResult(object): + """Two measurement intervals, return value of search algorithms. + + Partial fraction is NOT part of the result. Pdr interval should be valid + for all partial fractions implied by the interval.""" + + def __init__(self, ndr_interval, pdr_interval): + """Store the measured intervals after checking argument types. + + :param ndr_interval: Object containing data for NDR part of the result. + :param pdr_interval: Object containing data for PDR part of the result. + :type ndr_interval: ReceiveRateInterval.ReceiveRateInterval + :type pdr_interval: ReceiveRateInterval.ReceiveRateInterval + """ + # TODO: Type checking is not very pythonic, + # perhaps users can fix wrong usage without it? + if not isinstance(ndr_interval, ReceiveRateInterval): + raise TypeError("ndr_interval, is not a ReceiveRateInterval: " + "{ndr!r}".format(ndr=ndr_interval)) + if not isinstance(pdr_interval, ReceiveRateInterval): + raise TypeError("pdr_interval, is not a ReceiveRateInterval: " + "{pdr!r}".format(pdr=pdr_interval)) + self.ndr_interval = ndr_interval + self.pdr_interval = pdr_interval + + def width_in_goals(self, relative_width_goal): + """Return a debug string related to current widths in logarithmic scale. + + :param relative_width_goal: Upper bound times this is the goal + difference between upper bound and lower bound. + :type relative_width_goal: float + :returns: Message containing NDR and PDR widths in goals. + :rtype: str + """ + return "ndr {ndr_in_goals}; pdr {pdr_in_goals}".format( + ndr_in_goals=self.ndr_interval.width_in_goals(relative_width_goal), + pdr_in_goals=self.pdr_interval.width_in_goals(relative_width_goal)) + + def __str__(self): + """Return string as tuple of named values.""" + return "NDR={ndr!s};PDR={pdr!s}".format( + ndr=self.ndr_interval, pdr=self.pdr_interval) + + def __repr__(self): + """Return string evaluable as a constructor call.""" + return "NdrPdrResult(ndr_interval={ndr!r},pdr_interval={pdr!r})".format( + ndr=self.ndr_interval, pdr=self.pdr_interval) diff --git a/yardstick/network_services/helpers/vpp_helpers/receive_rate_interval.py b/yardstick/network_services/helpers/vpp_helpers/receive_rate_interval.py new file mode 100644 index 000000000..517a99c1f --- /dev/null +++ b/yardstick/network_services/helpers/vpp_helpers/receive_rate_interval.py @@ -0,0 +1,88 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This is a modified copy of +# https://gerrit.fd.io/r/gitweb?p=csit.git;a=blob_plain;f=resources/libraries/python/MLRsearch/ReceiveRateInterval.py;hb=HEAD + +import math + +from yardstick.network_services.helpers.vpp_helpers.receive_rate_measurement import \ + ReceiveRateMeasurement + + +class ReceiveRateInterval(object): + """Structure defining two Rr measurements, and their relation.""" + + def __init__(self, measured_low, measured_high): + """Store the bound measurements after checking argument types. + + :param measured_low: Measurement for the lower bound. + :param measured_high: Measurement for the upper bound. + :type measured_low: ReceiveRateMeasurement.ReceiveRateMeasurement + :type measured_high: ReceiveRateMeasurement.ReceiveRateMeasurement + """ + # TODO: Type checking is not very pythonic, + # perhaps users can fix wrong usage without it? + if not isinstance(measured_low, ReceiveRateMeasurement): + raise TypeError("measured_low is not a ReceiveRateMeasurement: " + "{low!r}".format(low=measured_low)) + if not isinstance(measured_high, ReceiveRateMeasurement): + raise TypeError("measured_high is not a ReceiveRateMeasurement: " + "{high!r}".format(high=measured_high)) + self.measured_low = measured_low + self.measured_high = measured_high + # Declare secondary quantities to appease pylint. + self.abs_tr_width = None + """Absolute width of target transmit rate. Upper minus lower.""" + self.rel_tr_width = None + """Relative width of target transmit rate. Absolute divided by upper.""" + self.sort() + + def sort(self): + """Sort bounds by target Tr, compute secondary quantities.""" + if self.measured_low.target_tr > self.measured_high.target_tr: + self.measured_low, self.measured_high = ( + self.measured_high, self.measured_low) + self.abs_tr_width = ( + self.measured_high.target_tr - self.measured_low.target_tr) + self.rel_tr_width = round( + self.abs_tr_width / self.measured_high.target_tr, 5) + + def width_in_goals(self, relative_width_goal): + """Return float value. + + Relative width goal is some (negative) value on logarithmic scale. + Current relative width is another logarithmic value. + Return the latter divided by the former. + This is useful when investigating how did surprising widths come to be. + + :param relative_width_goal: Upper bound times this is the goal + difference between upper bound and lower bound. + :type relative_width_goal: float + :returns: Current width as logarithmic multiple of goal width [1]. + :rtype: float + """ + return round(math.log(1.0 - self.rel_tr_width) / math.log( + 1.0 - relative_width_goal), 5) + + def __str__(self): + """Return string as half-open interval.""" + return "[{low!s};{high!s})".format( + low=self.measured_low, high=self.measured_high) + + def __repr__(self): + """Return string evaluable as a constructor call.""" + return ("ReceiveRateInterval(measured_low={low!r}" + ",measured_high={high!r})".format(low=self.measured_low, + high=self.measured_high)) diff --git a/yardstick/network_services/helpers/vpp_helpers/receive_rate_measurement.py b/yardstick/network_services/helpers/vpp_helpers/receive_rate_measurement.py new file mode 100644 index 000000000..2c59ea104 --- /dev/null +++ b/yardstick/network_services/helpers/vpp_helpers/receive_rate_measurement.py @@ -0,0 +1,58 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This is a modified copy of +# https://gerrit.fd.io/r/gitweb?p=csit.git;a=blob_plain;f=resources/libraries/python/MLRsearch/ReceiveRateMeasurement.py;hb=HEAD + + +class ReceiveRateMeasurement(object): + """Structure defining the result of single Rr measurement.""" + + def __init__(self, duration, target_tr, transmit_count, loss_count): + """Constructor, normalize primary and compute secondary quantities. + + :param duration: Measurement duration [s]. + :param target_tr: Target transmit rate [pps]. + If bidirectional traffic is measured, this is bidirectional rate. + :param transmit_count: Number of packets transmitted [1]. + :param loss_count: Number of packets transmitted but not received [1]. + :type duration: float + :type target_tr: float + :type transmit_count: int + :type loss_count: int + """ + self.duration = float(duration) + self.target_tr = float(target_tr) + self.transmit_count = int(transmit_count) + self.loss_count = int(loss_count) + self.receive_count = round(transmit_count - loss_count, 5) + self.transmit_rate = round(transmit_count / self.duration, 5) + self.loss_rate = round(loss_count / self.duration, 5) + self.receive_rate = round(self.receive_count / self.duration, 5) + self.loss_fraction = round( + float(self.loss_count) / self.transmit_count, 5) + # TODO: Do we want to store also the real time (duration + overhead)? + + def __str__(self): + """Return string reporting input and loss fraction.""" + return "d={dur!s},Tr={rate!s},Df={frac!s}".format( + dur=self.duration, rate=self.target_tr, frac=self.loss_fraction) + + def __repr__(self): + """Return string evaluable as a constructor call.""" + return ("ReceiveRateMeasurement(duration={dur!r},target_tr={rate!r}" + ",transmit_count={trans!r},loss_count={loss!r})".format( + dur=self.duration, rate=self.target_tr, + trans=self.transmit_count, + loss=self.loss_count)) diff --git a/yardstick/network_services/libs/ixia_libs/ixnet/ixnet_api.py b/yardstick/network_services/libs/ixia_libs/ixnet/ixnet_api.py index cc627ef78..89a855480 100644 --- a/yardstick/network_services/libs/ixia_libs/ixnet/ixnet_api.py +++ b/yardstick/network_services/libs/ixia_libs/ixnet/ixnet_api.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2018 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,6 +14,8 @@ import ipaddress import logging +import re +import collections import IxNetwork @@ -62,6 +64,13 @@ SUPPORTED_TOS_FIELDS = [ 'reliability' ] +IP_PRIORITY_PATTERN = r'[^\w+]*.+(Raw priority|' \ + 'Precedence|' \ + 'Default PHB|' \ + 'Class selector PHB|' \ + 'Assured forwarding selector PHB|' \ + 'Expedited forwarding PHB)' + class Vlan(object): def __init__(self, @@ -85,12 +94,8 @@ class IxNextgen(object): # pragma: no cover "port_name": 'Port Name', "Frames_Tx": 'Frames Tx.', "Valid_Frames_Rx": 'Valid Frames Rx.', - "Frames_Tx_Rate": 'Frames Tx. Rate', - "Valid_Frames_Rx_Rate": 'Valid Frames Rx. Rate', - "Tx_Rate_Kbps": 'Tx. Rate (Kbps)', - "Rx_Rate_Kbps": 'Rx. Rate (Kbps)', - "Tx_Rate_Mbps": 'Tx. Rate (Mbps)', - "Rx_Rate_Mbps": 'Rx. Rate (Mbps)', + "Bytes_Tx": 'Bytes Tx.', + "Bytes_Rx": 'Bytes Rx.' } LATENCY_NAME_MAP = { @@ -99,6 +104,18 @@ class IxNextgen(object): # pragma: no cover "Store-Forward_Max_latency_ns": 'Store-Forward Max Latency (ns)', } + FLOWS_STATS_NAME_MAP = { + "Tx_Port": 'Tx Port', + "VLAN-ID": 'VLAN:VLAN-ID', + "IP_Priority": re.compile(IP_PRIORITY_PATTERN), + "Flow_Group": 'Flow Group', + "Tx_Frames": 'Tx Frames', + "Rx_Frames": 'Rx Frames', + "Store-Forward_Avg_latency_ns": 'Store-Forward Avg Latency (ns)', + "Store-Forward_Min_latency_ns": 'Store-Forward Min Latency (ns)', + "Store-Forward_Max_latency_ns": 'Store-Forward Max Latency (ns)' + } + PPPOX_CLIENT_PER_PORT_NAME_MAP = { 'subs_port': 'Port', 'Sessions_Up': 'Sessions Up', @@ -111,6 +128,18 @@ class IxNextgen(object): # pragma: no cover FLOW_STATISTICS = '::ixNet::OBJ-/statistics/view:"Flow Statistics"' PPPOX_CLIENT_PER_PORT = '::ixNet::OBJ-/statistics/view:"PPPoX Client Per Port"' + PPPOE_SCENARIO_STATS = { + 'port_statistics': PORT_STATISTICS, + 'flow_statistic': FLOW_STATISTICS, + 'pppox_client_per_port': PPPOX_CLIENT_PER_PORT + } + + PPPOE_SCENARIO_STATS_MAP = { + 'port_statistics': PORT_STATS_NAME_MAP, + 'flow_statistic': FLOWS_STATS_NAME_MAP, + 'pppox_client_per_port': PPPOX_CLIENT_PER_PORT_NAME_MAP + } + @staticmethod def get_config(tg_cfg): card = [] @@ -372,7 +401,25 @@ class IxNextgen(object): # pragma: no cover '/traffic/protocolTemplate:"{}"'.format(protocol_name)) self.ixnet.execute('append', previous_element, protocol) - def _setup_config_elements(self, add_default_proto=True): + def is_qinq(self, flow_data): + for traffic_type in flow_data: + if flow_data[traffic_type]['outer_l2'].get('QinQ'): + return True + return False + + def _flows_settings(self, cfg): + flows_data = [] + res = [key for key in cfg.keys() if key.split('_')[0] in ['uplink', 'downlink']] + for i in range(len(res)): + uplink = 'uplink_{}'.format(i) + downlink = 'downlink_{}'.format(i) + if uplink in res: + flows_data.append(cfg[uplink]) + if downlink in res: + flows_data.append(cfg[downlink]) + return flows_data + + def _setup_config_elements(self, traffic_profile, add_default_proto=True): """Setup the config elements The traffic item is configured to allow individual configurations per @@ -388,7 +435,9 @@ class IxNextgen(object): # pragma: no cover 'trafficItem')[0] log.info('Split the frame rate distribution per config element') config_elements = self.ixnet.getList(traffic_item_id, 'configElement') - for config_element in config_elements: + flows = self._flows_settings(traffic_profile.params) + # TODO: check length of both lists, it should be equal!!! + for config_element, flow_data in zip(config_elements, flows): self.ixnet.setAttribute(config_element + '/frameRateDistribution', '-portDistribution', 'splitRateEvenly') self.ixnet.setAttribute(config_element + '/frameRateDistribution', @@ -399,8 +448,13 @@ class IxNextgen(object): # pragma: no cover PROTO_UDP, config_element + '/stack:"ethernet-1"') self._append_procotol_to_stack( PROTO_IPV4, config_element + '/stack:"ethernet-1"') + if self.is_qinq(flow_data): + self._append_procotol_to_stack( + PROTO_VLAN, config_element + '/stack:"ethernet-1"') + self._append_procotol_to_stack( + PROTO_VLAN, config_element + '/stack:"ethernet-1"') - def create_traffic_model(self, uplink_ports, downlink_ports): + def create_traffic_model(self, uplink_ports, downlink_ports, traffic_profile): """Create a traffic item and the needed flow groups Each flow group inside the traffic item (only one is present) @@ -415,9 +469,10 @@ class IxNextgen(object): # pragma: no cover uplink_endpoints = [port + '/protocols' for port in uplink_ports] downlink_endpoints = [port + '/protocols' for port in downlink_ports] self._create_flow_groups(uplink_endpoints, downlink_endpoints) - self._setup_config_elements() + self._setup_config_elements(traffic_profile=traffic_profile) - def create_ipv4_traffic_model(self, uplink_endpoints, downlink_endpoints): + def create_ipv4_traffic_model(self, uplink_endpoints, downlink_endpoints, + traffic_profile): """Create a traffic item and the needed flow groups Each flow group inside the traffic item (only one is present) @@ -430,7 +485,8 @@ class IxNextgen(object): # pragma: no cover """ self._create_traffic_item('ipv4') self._create_flow_groups(uplink_endpoints, downlink_endpoints) - self._setup_config_elements(False) + self._setup_config_elements(traffic_profile=traffic_profile, + add_default_proto=False) def _update_frame_mac(self, ethernet_descriptor, field, mac_address): """Set the MAC address in a config element stack Ethernet field @@ -497,11 +553,6 @@ class IxNextgen(object): # pragma: no cover '-fieldValue', ETHER_TYPE_802_1ad, '-valueType', SINGLE_VALUE) - self._append_procotol_to_stack( - PROTO_VLAN, config_element + '/stack:"ethernet-1"') - self._append_procotol_to_stack( - PROTO_VLAN, config_element + '/stack:"ethernet-1"') - self._update_vlan_tag(fg_id, s_vlan, S_VLAN) self._update_vlan_tag(fg_id, c_vlan, C_VLAN) @@ -731,6 +782,19 @@ class IxNextgen(object): # pragma: no cover 'getColumnValues', view_obj, data_ixia) for data_yardstick, data_ixia in name_map.items()} + def _get_view_page_stats(self, view_obj): + """Get full view page stats + + :param view_obj: view object, e.g. + '::ixNet::OBJ-/statistics/view:"Port Statistics"' + :return: (list) List of dicts. Each dict represents view page row + """ + view = view_obj + '/page' + column_headers = self.ixnet.getAttribute(view, '-columnCaptions') + view_rows = self.ixnet.getAttribute(view, '-rowValues') + view_page = [dict(zip(column_headers, row[0])) for row in view_rows] + return view_page + def _set_egress_flow_tracking(self, encapsulation, offset): """Set egress flow tracking options @@ -753,7 +817,7 @@ class IxNextgen(object): # pragma: no cover self.ixnet.setAttribute(enc_obj, '-offset', offset) self.ixnet.commit() - def _set_flow_tracking(self, track_by): + def set_flow_tracking(self, track_by): """Set flow tracking options :param track_by: list of tracking fields @@ -780,24 +844,39 @@ class IxNextgen(object): # pragma: no cover return stats def get_pppoe_scenario_statistics(self): - """Retrieve port, flow and PPPoE subscribers statistics - - "Port Statistics" parameters are stored in self.PORT_STATS_NAME_MAP. - "Flow Statistics" parameters are stored in self.LATENCY_NAME_MAP. - "PPPoX Client Per Port" parameters are stored in - self.PPPOE_CLIENT_PER_PORT_NAME_MAP - - :return: dictionary with the statistics; the keys of this dictionary - are PORT_STATS_NAME_MAP, LATENCY_NAME_MAP and - PPPOE_CLIENT_PER_PORT_NAME_MAP keys. - """ - stats = self._build_stats_map(self.PORT_STATISTICS, - self.PORT_STATS_NAME_MAP) - stats.update(self._build_stats_map(self.FLOW_STATISTICS, - self.LATENCY_NAME_MAP)) - stats.update(self._build_stats_map(self.PPPOX_CLIENT_PER_PORT, - self.PPPOX_CLIENT_PER_PORT_NAME_MAP)) - return stats + """Retrieve port, flow and PPPoE subscribers statistics""" + stats = collections.defaultdict(list) + result = collections.defaultdict(list) + for stat, view in self.PPPOE_SCENARIO_STATS.items(): + # Get view total pages number + total_pages = self.ixnet.getAttribute( + view + '/page', '-totalPages') + # Collect stats from all view pages + for page in range(1, int(total_pages) + 1): + current_page = int(self.ixnet.getAttribute( + view + '/page', '-currentPage')) + if page != int(current_page): + self.ixnet.setAttribute(view + '/page', '-currentPage', + str(page)) + self.ixnet.commit() + page_data = self._get_view_page_stats(view) + stats[stat].extend(page_data) + # Filter collected views stats + for stat in stats: + for view_row in stats[stat]: + filtered_row = {} + for key, value in self.PPPOE_SCENARIO_STATS_MAP[stat].items(): + if isinstance(value, str): + filtered_row.update({key: view_row[value]}) + # Handle keys which values are represented by regex + else: + for k in view_row.keys(): + if value.match(k): + value = value.match(k).group() + filtered_row.update({key: view_row[value]}) + break + result[stat].append(filtered_row) + return result def start_protocols(self): self.ixnet.execute('startAllProtocols') diff --git a/yardstick/network_services/nfvi/resource.py b/yardstick/network_services/nfvi/resource.py index 5922bd3b9..ba49ab5b4 100644 --- a/yardstick/network_services/nfvi/resource.py +++ b/yardstick/network_services/nfvi/resource.py @@ -48,6 +48,7 @@ class ResourceProfile(object): This profile adds a resource at the beginning of the test session """ COLLECTD_CONF = "collectd.conf" + BAR_COLLECTD_CONF_PATH = "/opt/collectd/etc/collectd.conf.d/" AMPQ_PORT = 5672 DEFAULT_INTERVAL = 25 DEFAULT_TIMEOUT = 3600 @@ -248,6 +249,8 @@ class ResourceProfile(object): "plugins": self.plugins, } self._provide_config_file(config_file_path, self.COLLECTD_CONF, kwargs) + self._provide_config_file(self.BAR_COLLECTD_CONF_PATH, + self.COLLECTD_CONF, kwargs) def _setup_ovs_stats(self, connection): try: diff --git a/yardstick/network_services/traffic_profile/__init__.py b/yardstick/network_services/traffic_profile/__init__.py index 72a61b6b4..85b3d54a0 100644 --- a/yardstick/network_services/traffic_profile/__init__.py +++ b/yardstick/network_services/traffic_profile/__init__.py @@ -30,6 +30,8 @@ def register_modules(): 'yardstick.network_services.traffic_profile.rfc2544', 'yardstick.network_services.traffic_profile.pktgen', 'yardstick.network_services.traffic_profile.landslide_profile', + 'yardstick.network_services.traffic_profile.vpp_rfc2544', + 'yardstick.network_services.traffic_profile.sip', ] for module in modules: diff --git a/yardstick/network_services/traffic_profile/http_ixload.py b/yardstick/network_services/traffic_profile/http_ixload.py index b88aadff7..ec0762500 100644 --- a/yardstick/network_services/traffic_profile/http_ixload.py +++ b/yardstick/network_services/traffic_profile/http_ixload.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,14 +16,6 @@ import sys import os import logging import collections -import subprocess -try: - libs = subprocess.check_output( - 'python -c "import site; print(site.getsitepackages())"', shell=True) - - sys.path.extend(libs[1:-1].replace("'", "").split(',')) -except subprocess.CalledProcessError: - pass # ixload uses its own py2. So importing jsonutils fails. So adding below # workaround to support call from yardstick @@ -32,14 +24,26 @@ try: except ImportError: import json as jsonutils -from yardstick.common import exceptions #pylint: disable=wrong-import-position + +class ErrorClass(object): + + def __init__(self, *args, **kwargs): + if 'test' not in kwargs: + raise RuntimeError + + def __getattr__(self, item): + raise AttributeError + + +class InvalidRxfFile(Exception): + message = 'Loaded rxf file has unexpected format' + try: from IxLoad import IxLoad, StatCollectorUtils except ImportError: - IxLoad = exceptions.ErrorClass - StatCollectorUtils = exceptions.ErrorClass - + IxLoad = ErrorClass + StatCollectorUtils = ErrorClass LOG = logging.getLogger(__name__) CSV_FILEPATH_NAME = 'IxL_statResults.csv' @@ -205,7 +209,7 @@ class IXLOADHttpTest(object): ipAddress=address, gatewayAddress=gateway) except Exception: - raise exceptions.InvalidRxfFile + raise InvalidRxfFile def update_network_mac_address(self, net_traffic, mac): """Update MACaddress for net_traffic object @@ -233,7 +237,7 @@ class IXLOADHttpTest(object): "MacRange") mac_range.config(mac=mac) except Exception: - raise exceptions.InvalidRxfFile + raise InvalidRxfFile def update_network_param(self, net_traffic, param): """Update net_traffic by parameters specified in param""" @@ -301,7 +305,7 @@ class IXLOADHttpTest(object): ix_http_command = activity.agent.actionList[0] ix_http_command.config(pageObject=page_object) except Exception: - raise exceptions.InvalidRxfFile + raise InvalidRxfFile def update_user_count(self, net_traffic, user_count): """Update userObjectiveValue field in activity object in net_traffic @@ -318,7 +322,7 @@ class IXLOADHttpTest(object): activity = net_traffic.activityList[0] activity.config(userObjectiveValue=user_count) except Exception: - raise exceptions.InvalidRxfFile + raise InvalidRxfFile def start_http_test(self): self.ix_load = IxLoad() diff --git a/yardstick/network_services/traffic_profile/ixia_rfc2544.py b/yardstick/network_services/traffic_profile/ixia_rfc2544.py index 35038891b..ca45b500d 100644 --- a/yardstick/network_services/traffic_profile/ixia_rfc2544.py +++ b/yardstick/network_services/traffic_profile/ixia_rfc2544.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -28,7 +28,6 @@ class IXIARFC2544Profile(trex_traffic_profile.TrexProfile): UPLINK = 'uplink' DOWNLINK = 'downlink' DROP_PERCENT_ROUND = 6 - RATE_ROUND = 5 STATUS_SUCCESS = "Success" STATUS_FAIL = "Failure" @@ -36,6 +35,7 @@ class IXIARFC2544Profile(trex_traffic_profile.TrexProfile): super(IXIARFC2544Profile, self).__init__(yaml_data) self.rate = self.config.frame_rate self.rate_unit = self.config.rate_unit + self.iteration = 0 self.full_profile = {} def _get_ip_and_mask(self, ip_range): @@ -146,12 +146,16 @@ class IXIARFC2544Profile(trex_traffic_profile.TrexProfile): return result - def _ixia_traffic_generate(self, traffic, ixia_obj): + def _ixia_traffic_generate(self, traffic, ixia_obj, traffic_gen): ixia_obj.update_frame(traffic, self.config.duration) ixia_obj.update_ip_packet(traffic) ixia_obj.update_l4(traffic) + self._update_traffic_tracking_options(traffic_gen) ixia_obj.start_traffic() + def _update_traffic_tracking_options(self, traffic_gen): + traffic_gen.update_tracking_options() + def update_traffic_profile(self, traffic_generator): def port_generator(): for vld_id, intfs in sorted(traffic_generator.networks.items()): @@ -176,25 +180,34 @@ class IXIARFC2544Profile(trex_traffic_profile.TrexProfile): self.max_rate = self.rate self.min_rate = 0.0 else: - self.rate = round(float(self.max_rate + self.min_rate) / 2.0, - self.RATE_ROUND) + self.rate = self._get_next_rate() + self.iteration = traffic_generator.rfc_helper.iteration.value traffic = self._get_ixia_traffic_profile(self.full_profile, mac) - self._ixia_traffic_generate(traffic, ixia_obj) + self._ixia_traffic_generate(traffic, ixia_obj, traffic_generator) return first_run + # pylint: disable=unused-argument def get_drop_percentage(self, samples, tol_min, tolerance, precision, - first_run=False): + resolution, first_run=False, tc_rfc2544_opts=None): completed = False - drop_percent = 100 + drop_percent = 100.0 num_ifaces = len(samples) duration = self.config.duration in_packets_sum = sum( - [samples[iface]['in_packets'] for iface in samples]) + [samples[iface]['InPackets'] for iface in samples]) out_packets_sum = sum( - [samples[iface]['out_packets'] for iface in samples]) + [samples[iface]['OutPackets'] for iface in samples]) + in_bytes_sum = sum( + [samples[iface]['InBytes'] for iface in samples]) + out_bytes_sum = sum( + [samples[iface]['OutBytes'] for iface in samples]) rx_throughput = round(float(in_packets_sum) / duration, 3) tx_throughput = round(float(out_packets_sum) / duration, 3) + # Rx throughput in Bps + rx_throughput_bps = round(float(in_bytes_sum) / duration, 3) + # Tx throughput in Bps + tx_throughput_bps = round(float(out_bytes_sum) / duration, 3) packet_drop = abs(out_packets_sum - in_packets_sum) try: @@ -217,19 +230,23 @@ class IXIARFC2544Profile(trex_traffic_profile.TrexProfile): else: completed = True + last_rate = self.rate + next_rate = self._get_next_rate() + if abs(next_rate - self.rate) < resolution: + LOG.debug("rate=%s, next_rate=%s, resolution=%s", self.rate, + next_rate, resolution) + # stop test if the difference between the rate transmission + # in two iterations is smaller than the value of the resolution + completed = True + LOG.debug("tolerance=%s, tolerance_precision=%s drop_percent=%s " "completed=%s", tolerance, precision, drop_percent, completed) - latency_ns_avg = float( - sum([samples[iface]['Store-Forward_Avg_latency_ns'] - for iface in samples])) / num_ifaces - latency_ns_min = float( - sum([samples[iface]['Store-Forward_Min_latency_ns'] - for iface in samples])) / num_ifaces - latency_ns_max = float( - sum([samples[iface]['Store-Forward_Max_latency_ns'] - for iface in samples])) / num_ifaces + latency_ns_avg = float(sum( + [samples[iface]['LatencyAvg'] for iface in samples])) / num_ifaces + latency_ns_min = min([samples[iface]['LatencyMin'] for iface in samples]) + latency_ns_max = max([samples[iface]['LatencyMax'] for iface in samples]) samples['Status'] = self.STATUS_FAIL if round(drop_percent, precision) <= tolerance: @@ -237,10 +254,15 @@ class IXIARFC2544Profile(trex_traffic_profile.TrexProfile): samples['TxThroughput'] = tx_throughput samples['RxThroughput'] = rx_throughput + samples['TxThroughputBps'] = tx_throughput_bps + samples['RxThroughputBps'] = rx_throughput_bps samples['DropPercentage'] = drop_percent - samples['latency_ns_avg'] = latency_ns_avg - samples['latency_ns_min'] = latency_ns_min - samples['latency_ns_max'] = latency_ns_max + samples['LatencyAvg'] = latency_ns_avg + samples['LatencyMin'] = latency_ns_min + samples['LatencyMax'] = latency_ns_max + samples['Rate'] = last_rate + samples['PktSize'] = self._get_framesize() + samples['Iteration'] = self.iteration return completed, samples @@ -264,12 +286,140 @@ class IXIARFC2544PppoeScenarioProfile(IXIARFC2544Profile): self.full_profile.update({downlink: self.params[downlink]}) def update_traffic_profile(self, traffic_generator): + + networks = collections.OrderedDict() + + # Sort network interfaces pairs + for i in range(len(traffic_generator.networks)): + uplink = '_'.join([self.UPLINK, str(i)]) + downlink = '_'.join([self.DOWNLINK, str(i)]) + if uplink in traffic_generator.networks: + networks[uplink] = traffic_generator.networks[uplink] + if downlink in traffic_generator.networks: + networks[downlink] = traffic_generator.networks[downlink] + def port_generator(): - for vld_id, intfs in sorted(traffic_generator.networks.items()): - if not vld_id.startswith((self.UPLINK, self.DOWNLINK)): - continue + for intfs in networks.values(): for intf in intfs: yield traffic_generator.vnfd_helper.port_num(intf) self._get_flow_groups_params() self.ports = [port for port in port_generator()] + + def _get_prio_flows_drop_percentage(self, stats): + drop_percent = 100 + for prio_id in stats: + prio_flow = stats[prio_id] + sum_packet_drop = abs(prio_flow['OutPackets'] - prio_flow['InPackets']) + try: + drop_percent = round( + (sum_packet_drop / float(prio_flow['OutPackets'])) * 100, + self.DROP_PERCENT_ROUND) + except ZeroDivisionError: + LOG.info('No traffic is flowing') + prio_flow['DropPercentage'] = drop_percent + return stats + + def _get_summary_pppoe_subs_counters(self, samples): + result = {} + keys = ['SessionsUp', + 'SessionsDown', + 'SessionsNotStarted', + 'SessionsTotal'] + for key in keys: + result[key] = \ + sum([samples[port][key] for port in samples + if key in samples[port]]) + return result + + def get_drop_percentage(self, samples, tol_min, tolerance, precision, + resolution, first_run=False, tc_rfc2544_opts=None): + completed = False + sum_drop_percent = 100 + num_ifaces = len(samples) + duration = self.config.duration + last_rate = self.rate + priority_stats = samples.pop('priority_stats') + priority_stats = self._get_prio_flows_drop_percentage(priority_stats) + summary_subs_stats = self._get_summary_pppoe_subs_counters(samples) + in_packets_sum = sum( + [samples[iface]['InPackets'] for iface in samples]) + out_packets_sum = sum( + [samples[iface]['OutPackets'] for iface in samples]) + in_bytes_sum = sum( + [samples[iface]['InBytes'] for iface in samples]) + out_bytes_sum = sum( + [samples[iface]['OutBytes'] for iface in samples]) + rx_throughput = round(float(in_packets_sum) / duration, 3) + tx_throughput = round(float(out_packets_sum) / duration, 3) + # Rx throughput in Bps + rx_throughput_bps = round(float(in_bytes_sum) / duration, 3) + # Tx throughput in Bps + tx_throughput_bps = round(float(out_bytes_sum) / duration, 3) + sum_packet_drop = abs(out_packets_sum - in_packets_sum) + + try: + sum_drop_percent = round( + (sum_packet_drop / float(out_packets_sum)) * 100, + self.DROP_PERCENT_ROUND) + except ZeroDivisionError: + LOG.info('No traffic is flowing') + + latency_ns_avg = float(sum( + [samples[iface]['LatencyAvg'] for iface in samples])) / num_ifaces + latency_ns_min = min([samples[iface]['LatencyMin'] for iface in samples]) + latency_ns_max = max([samples[iface]['LatencyMax'] for iface in samples]) + + samples['TxThroughput'] = tx_throughput + samples['RxThroughput'] = rx_throughput + samples['TxThroughputBps'] = tx_throughput_bps + samples['RxThroughputBps'] = rx_throughput_bps + samples['DropPercentage'] = sum_drop_percent + samples['LatencyAvg'] = latency_ns_avg + samples['LatencyMin'] = latency_ns_min + samples['LatencyMax'] = latency_ns_max + samples['Priority'] = priority_stats + samples['Rate'] = last_rate + samples['PktSize'] = self._get_framesize() + samples['Iteration'] = self.iteration + samples.update(summary_subs_stats) + + if tc_rfc2544_opts: + priority = tc_rfc2544_opts.get('priority') + if priority: + drop_percent = samples['Priority'][priority]['DropPercentage'] + else: + drop_percent = sum_drop_percent + else: + drop_percent = sum_drop_percent + + if first_run: + completed = True if drop_percent <= tolerance else False + if (first_run and + self.rate_unit == tp_base.TrafficProfileConfig.RATE_FPS): + self.rate = float(out_packets_sum) / duration / num_ifaces + + if drop_percent > tolerance: + self.max_rate = self.rate + elif drop_percent < tol_min: + self.min_rate = self.rate + else: + completed = True + + next_rate = self._get_next_rate() + if abs(next_rate - self.rate) < resolution: + LOG.debug("rate=%s, next_rate=%s, resolution=%s", self.rate, + next_rate, resolution) + # stop test if the difference between the rate transmission + # in two iterations is smaller than the value of the resolution + completed = True + + LOG.debug("tolerance=%s, tolerance_precision=%s drop_percent=%s " + "completed=%s", tolerance, precision, drop_percent, + completed) + + samples['Status'] = self.STATUS_FAIL + if round(drop_percent, precision) <= tolerance: + samples['Status'] = self.STATUS_SUCCESS + + return completed, samples diff --git a/yardstick/network_services/traffic_profile/rfc2544.py b/yardstick/network_services/traffic_profile/rfc2544.py index e33c437c9..aaa491b75 100644 --- a/yardstick/network_services/traffic_profile/rfc2544.py +++ b/yardstick/network_services/traffic_profile/rfc2544.py @@ -23,7 +23,7 @@ from yardstick.common import constants from yardstick.network_services.traffic_profile import trex_traffic_profile -LOGGING = logging.getLogger(__name__) +LOG = logging.getLogger(__name__) SRC_PORT = 'sport' DST_PORT = 'dport' @@ -72,14 +72,16 @@ class RFC2544Profile(trex_traffic_profile.TrexProfile): """TRex RFC2544 traffic profile""" TOLERANCE_LIMIT = 0.01 + STATUS_SUCCESS = "Success" + STATUS_FAIL = "Failure" def __init__(self, traffic_generator): super(RFC2544Profile, self).__init__(traffic_generator) self.generator = None + self.iteration = 0 self.rate = self.config.frame_rate self.max_rate = self.config.frame_rate self.min_rate = 0 - self.drop_percent_max = 0 def register_generator(self, generator): self.generator = generator @@ -126,6 +128,7 @@ class RFC2544Profile(trex_traffic_profile.TrexProfile): self.generator.client.start(ports=ports, duration=self.config.duration, force=True) + self.iteration = self.generator.rfc2544_helper.iteration.value return ports, port_pg_id def _create_profile(self, profile_data, rate, port_pg_id, enable_latency): @@ -142,7 +145,7 @@ class RFC2544Profile(trex_traffic_profile.TrexProfile): return trex_stl_streams.STLProfile(streams) def _create_imix_data(self, imix, - weight_mode=constants.DISTRIBUTION_IN_PACKETS): + weight_mode=constants.DISTRIBUTION_IN_BYTES): """Generate the IMIX distribution for a STL profile The input information is the framesize dictionary in a test case @@ -192,13 +195,13 @@ class RFC2544Profile(trex_traffic_profile.TrexProfile): imix_dip = {size: float(weight) / weight_normalize for size, weight in imix_count.items()} - if weight_mode == constants.DISTRIBUTION_IN_BYTES: + if weight_mode == constants.DISTRIBUTION_IN_PACKETS: return imix_dip byte_total = sum([int(size) * weight - for size, weight in imix_dip.items()]) - return {size: (int(size) * weight * 100) / byte_total - for size, weight in imix_dip.items()} + for size, weight in imix_count.items()]) + return {size: float(int(size) * weight * 100) / byte_total + for size, weight in imix_count.items()} def _create_vm(self, packet_definition): """Create the STL Raw instructions""" @@ -271,17 +274,24 @@ class RFC2544Profile(trex_traffic_profile.TrexProfile): return streams def get_drop_percentage(self, samples, tol_low, tol_high, - correlated_traffic): + correlated_traffic, resolution): # pylint: disable=unused-argument """Calculate the drop percentage and run the traffic""" completed = False + status = self.STATUS_FAIL out_pkt_end = sum(port['out_packets'] for port in samples[-1].values()) in_pkt_end = sum(port['in_packets'] for port in samples[-1].values()) out_pkt_ini = sum(port['out_packets'] for port in samples[0].values()) in_pkt_ini = sum(port['in_packets'] for port in samples[0].values()) + in_bytes_ini = sum(port['in_bytes'] for port in samples[0].values()) + out_bytes_ini = sum(port['out_bytes'] for port in samples[0].values()) + in_bytes_end = sum(port['in_bytes'] for port in samples[-1].values()) + out_bytes_end = sum(port['out_bytes'] for port in samples[-1].values()) time_diff = (list(samples[-1].values())[0]['timestamp'] - list(samples[0].values())[0]['timestamp']).total_seconds() out_packets = out_pkt_end - out_pkt_ini in_packets = in_pkt_end - in_pkt_ini + out_bytes = out_bytes_end - out_bytes_ini + in_bytes = in_bytes_end - in_bytes_ini tx_rate_fps = float(out_packets) / time_diff rx_rate_fps = float(in_packets) / time_diff drop_percent = 100.0 @@ -298,26 +308,55 @@ class RFC2544Profile(trex_traffic_profile.TrexProfile): elif drop_percent < tol_low: self.min_rate = self.rate else: + status = self.STATUS_SUCCESS completed = True last_rate = self.rate - self.rate = round(float(self.max_rate + self.min_rate) / 2.0, 5) - - throughput = rx_rate_fps * 2 if correlated_traffic else rx_rate_fps - - if drop_percent > self.drop_percent_max: - self.drop_percent_max = drop_percent - - latency = {port_num: value['latency'] - for port_num, value in samples[-1].items()} + self.rate = self._get_next_rate() + if abs(last_rate - self.rate) < resolution: + # stop test if the difference between the rate transmission + # in two iterations is smaller than the value of the resolution + completed = True + LOG.debug("rate=%s, next_rate=%s, resolution=%s, completed=%s", + last_rate, self.rate, resolution, completed) + + ports = samples[-1].keys() + num_ports = len(ports) + + output = {} + for port in ports: + output[port] = {} + first = samples[0][port] + last = samples[-1][port] + output[port]['InPackets'] = last['in_packets'] - first['in_packets'] + output[port]['OutPackets'] = last['out_packets'] - first['out_packets'] + output[port]['InBytes'] = last['in_bytes'] - first['in_bytes'] + output[port]['OutBytes'] = last['out_bytes'] - first['out_bytes'] + if self.config.enable_latency: + output[port]['LatencyAvg'] = float(sum( + [last['latency'][id]['average'] for id in + last['latency']]) * 1000) / len(last['latency']) + output[port]['LatencyMin'] = min( + [last['latency'][id]['total_min'] for id in + last['latency']]) * 1000 + output[port]['LatencyMax'] = max( + [last['latency'][id]['total_max'] for id in + last['latency']]) * 1000 + + output['TxThroughput'] = tx_rate_fps + output['RxThroughput'] = rx_rate_fps + output['RxThroughputBps'] = round(float(in_bytes) / time_diff, 3) + output['TxThroughputBps'] = round(float(out_bytes) / time_diff, 3) + output['DropPercentage'] = drop_percent + output['Rate'] = last_rate + output['PktSize'] = self._get_framesize() + output['Iteration'] = self.iteration + output['Status'] = status + + if self.config.enable_latency: + output['LatencyAvg'] = float( + sum([output[port]['LatencyAvg'] for port in ports])) / num_ports + output['LatencyMin'] = min([output[port]['LatencyMin'] for port in ports]) + output['LatencyMax'] = max([output[port]['LatencyMax'] for port in ports]) - output = { - 'TxThroughput': tx_rate_fps, - 'RxThroughput': rx_rate_fps, - 'CurrentDropPercentage': drop_percent, - 'Throughput': throughput, - 'DropPercentage': self.drop_percent_max, - 'Rate': last_rate, - 'Latency': latency - } return completed, output diff --git a/yardstick/network_services/traffic_profile/sip.py b/yardstick/network_services/traffic_profile/sip.py new file mode 100644 index 000000000..d18574090 --- /dev/null +++ b/yardstick/network_services/traffic_profile/sip.py @@ -0,0 +1,32 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from yardstick.network_services.traffic_profile import base + + +class SipProfile(base.TrafficProfile): + """ Sipp Traffic profile """ + + def __init__(self, yaml_data): + super(SipProfile, self).__init__(yaml_data) + self.generator = None + + def execute_traffic(self, traffic_generator=None): + if traffic_generator is not None and self.generator is None: + self.generator = traffic_generator + + def is_ended(self): + if self.generator is not None: + return self.generator.is_ended() + return False diff --git a/yardstick/network_services/traffic_profile/trex_traffic_profile.py b/yardstick/network_services/traffic_profile/trex_traffic_profile.py index ed0355fa5..cf538d488 100644 --- a/yardstick/network_services/traffic_profile/trex_traffic_profile.py +++ b/yardstick/network_services/traffic_profile/trex_traffic_profile.py @@ -52,6 +52,7 @@ class TrexProfile(base.TrafficProfile): IPv6: ('ip6_packet', Pkt.IPv6), UDP: ('udp_packet', Pkt.UDP), } + RATE_ROUND = 5 def _general_single_action_partial(self, protocol): def f(field): @@ -186,6 +187,8 @@ class TrexProfile(base.TrafficProfile): self.qinq = False self.vm_flow_vars = [] self.packets = [] + self.max_rate = 0 + self.min_rate = 0 self._map_proto_actions = { # the tuple is (single value function, range value function, if the values should be @@ -337,6 +340,25 @@ class TrexProfile(base.TrafficProfile): if 'dstport' in outer_l4: self._set_proto_addr(UDP, DST_PORT, outer_l4['dstport'], outer_l4['count']) + def _get_next_rate(self): + rate = round(float(self.max_rate + self.min_rate)/2.0, self.RATE_ROUND) + return rate + + def _get_framesize(self): + framesizes = [] + for traffickey, value in self.params.items(): + if not traffickey.startswith((self.UPLINK, self.DOWNLINK)): + continue + for _, data in value.items(): + framesize = data['outer_l2']['framesize'] + for size in (s for s, w in framesize.items() if int(w) != 0): + framesizes.append(size) + if len(set(framesizes)) == 0: + return '' + elif len(set(framesizes)) == 1: + return framesizes[0] + return 'IMIX' + @classmethod def _count_ip(cls, start_ip, end_ip): start = ipaddress.ip_address(six.u(start_ip)) diff --git a/yardstick/network_services/traffic_profile/vpp_rfc2544.py b/yardstick/network_services/traffic_profile/vpp_rfc2544.py new file mode 100644 index 000000000..412e4e69a --- /dev/null +++ b/yardstick/network_services/traffic_profile/vpp_rfc2544.py @@ -0,0 +1,339 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import ipaddress +import logging +import random +import string + +from trex_stl_lib import api as Pkt +from trex_stl_lib import trex_stl_client +from trex_stl_lib import trex_stl_packet_builder_scapy +from trex_stl_lib import trex_stl_streams + +from yardstick.common import constants +from yardstick.network_services.helpers.vpp_helpers.multiple_loss_ratio_search import \ + MultipleLossRatioSearch +from yardstick.network_services.traffic_profile.rfc2544 import RFC2544Profile, \ + PortPgIDMap +from yardstick.network_services.traffic_profile.trex_traffic_profile import IP, \ + DST + +LOGGING = logging.getLogger(__name__) + + +class VppRFC2544Profile(RFC2544Profile): + + def __init__(self, traffic_generator): + super(VppRFC2544Profile, self).__init__(traffic_generator) + + tp_cfg = traffic_generator["traffic_profile"] + self.number_of_intermediate_phases = tp_cfg.get("intermediate_phases", + 2) + + self.duration = self.config.duration + self.precision = self.config.test_precision + self.lower_bound = self.config.lower_bound + self.upper_bound = self.config.upper_bound + self.step_interval = self.config.step_interval + self.enable_latency = self.config.enable_latency + + self.pkt_size = None + self.flow = None + + self.tolerance_low = 0 + self.tolerance_high = 0 + + self.queue = None + self.port_pg_id = None + + self.current_lower = self.lower_bound + self.current_upper = self.upper_bound + + self.ports = [] + self.profiles = {} + + @property + def delta(self): + return self.current_upper - self.current_lower + + @property + def mid_point(self): + return (self.current_lower + self.current_upper) / 2 + + @staticmethod + def calculate_frame_size(imix): + if not imix: + return 64, 100 + + imix_count = {size.upper().replace('B', ''): int(weight) + for size, weight in imix.items()} + imix_sum = sum(imix_count.values()) + if imix_sum <= 0: + return 64, 100 + packets_total = sum([int(size) * weight + for size, weight in imix_count.items()]) + return packets_total / imix_sum, imix_sum + + @staticmethod + def _gen_payload(length): + payload = "" + for _ in range(length): + payload += random.choice(string.ascii_letters) + + return payload + + def bounds_iterator(self, logger=None): + self.current_lower = self.lower_bound + self.current_upper = self.upper_bound + + test_value = self.current_upper + while abs(self.delta) >= self.precision: + if logger: + logger.debug("New interval [%s, %s), precision: %d", + self.current_lower, + self.current_upper, self.step_interval) + logger.info("Testing with value %s", test_value) + + yield test_value + test_value = self.mid_point + + def register_generator(self, generator): + super(VppRFC2544Profile, self).register_generator(generator) + self.init_traffic_params(generator) + + def init_queue(self, queue): + self.queue = queue + self.queue.cancel_join_thread() + + def init_traffic_params(self, generator): + if generator.rfc2544_helper.latency: + self.enable_latency = True + self.tolerance_low = generator.rfc2544_helper.tolerance_low + self.tolerance_high = generator.rfc2544_helper.tolerance_high + self.max_rate = generator.scenario_helper.all_options.get('vpp_config', + {}).get( + 'max_rate', self.rate) + + def create_profile(self, profile_data, current_port): + streams = [] + for packet_name in profile_data: + imix = (profile_data[packet_name]. + get('outer_l2', {}).get('framesize')) + self.pkt_size, imix_sum = self.calculate_frame_size(imix) + self._create_vm(profile_data[packet_name]) + if self.max_rate > 100: + imix_data = self._create_imix_data(imix, + constants.DISTRIBUTION_IN_PACKETS) + else: + imix_data = self._create_imix_data(imix) + _streams = self._create_single_stream(current_port, imix_data, + imix_sum) + streams.extend(_streams) + return trex_stl_streams.STLProfile(streams) + + def _set_outer_l3v4_fields(self, outer_l3v4): + """ setup outer l3v4 fields from traffic profile """ + ip_params = {} + if 'proto' in outer_l3v4: + ip_params['proto'] = outer_l3v4['proto'] + self._set_proto_fields(IP, **ip_params) + + self.flow = int(outer_l3v4['count']) + src_start_ip, _ = outer_l3v4['srcip4'].split('-') + dst_start_ip, _ = outer_l3v4['dstip4'].split('-') + + self.ip_packet = Pkt.IP(src=src_start_ip, + dst=dst_start_ip, + proto=outer_l3v4['proto']) + if self.flow > 1: + dst_start_int = int(ipaddress.ip_address(str(dst_start_ip))) + dst_end_ip_new = ipaddress.ip_address( + dst_start_int + self.flow - 1) + # self._set_proto_addr(IP, SRC, outer_l3v4['srcip4'], outer_l3v4['count']) + self._set_proto_addr(IP, DST, + "{start_ip}-{end_ip}".format( + start_ip=dst_start_ip, + end_ip=str(dst_end_ip_new)), + self.flow) + + def _create_single_packet(self, size=64): + ether_packet = self.ether_packet + ip_packet = self.ip6_packet if self.ip6_packet else self.ip_packet + base_pkt = ether_packet / ip_packet + payload_len = max(0, size - len(base_pkt) - 4) + packet = trex_stl_packet_builder_scapy.STLPktBuilder( + pkt=base_pkt / self._gen_payload(payload_len), + vm=self.trex_vm) + packet_lat = trex_stl_packet_builder_scapy.STLPktBuilder( + pkt=base_pkt / self._gen_payload(payload_len)) + + return packet, packet_lat + + def _create_single_stream(self, current_port, imix_data, imix_sum, + isg=0.0): + streams = [] + for size, weight in ((int(size), float(weight)) for (size, weight) + in imix_data.items() if float(weight) > 0): + if current_port == 1: + isg += 10.0 + if self.max_rate > 100: + mode = trex_stl_streams.STLTXCont( + pps=int(weight * imix_sum / 100)) + mode_lat = mode + else: + mode = trex_stl_streams.STLTXCont( + percentage=weight * self.max_rate / 100) + mode_lat = trex_stl_streams.STLTXCont(pps=9000) + + packet, packet_lat = self._create_single_packet(size) + streams.append( + trex_stl_client.STLStream(isg=isg, packet=packet, mode=mode)) + if self.enable_latency: + pg_id = self.port_pg_id.increase_pg_id(current_port) + stl_flow = trex_stl_streams.STLFlowLatencyStats(pg_id=pg_id) + stream_lat = trex_stl_client.STLStream(isg=isg, + packet=packet_lat, + mode=mode_lat, + flow_stats=stl_flow) + streams.append(stream_lat) + return streams + + def execute_traffic(self, traffic_generator=None): + if traffic_generator is not None and self.generator is None: + self.generator = traffic_generator + + self.ports = [] + self.profiles = {} + self.port_pg_id = PortPgIDMap() + for vld_id, intfs in sorted(self.generator.networks.items()): + profile_data = self.params.get(vld_id) + if not profile_data: + continue + if (vld_id.startswith(self.DOWNLINK) and + self.generator.rfc2544_helper.correlated_traffic): + continue + for intf in intfs: + current_port = int(self.generator.port_num(intf)) + self.port_pg_id.add_port(current_port) + profile = self.create_profile(profile_data, current_port) + self.generator.client.add_streams(profile, + ports=[current_port]) + + self.ports.append(current_port) + self.profiles[current_port] = profile + + timeout = self.generator.scenario_helper.scenario_cfg["runner"][ + "duration"] + test_data = { + "test_duration": timeout, + "test_precision": self.precision, + "tolerated_loss": self.tolerance_high, + "duration": self.duration, + "packet_size": self.pkt_size, + "flow": self.flow + } + + if self.max_rate > 100: + self.binary_search_with_optimized(self.generator, self.duration, + timeout, test_data) + else: + self.binary_search(self.generator, self.duration, + self.tolerance_high, test_data) + + def binary_search_with_optimized(self, traffic_generator, duration, + timeout, test_data): + self.queue.cancel_join_thread() + algorithm = MultipleLossRatioSearch( + measurer=traffic_generator, latency=self.enable_latency, + pkt_size=self.pkt_size, + final_trial_duration=duration, + final_relative_width=self.step_interval / 100, + number_of_intermediate_phases=self.number_of_intermediate_phases, + initial_trial_duration=1, + timeout=timeout) + algorithm.init_generator(self.ports, self.port_pg_id, self.profiles, + test_data, self.queue) + return algorithm.narrow_down_ndr_and_pdr(10000, self.max_rate, + self.tolerance_high) + + def binary_search(self, traffic_generator, duration, tolerance_value, + test_data): + theor_max_thruput = 0 + result_samples = {} + + for test_value in self.bounds_iterator(LOGGING): + stats = traffic_generator.send_traffic_on_tg(self.ports, + self.port_pg_id, + duration, + str( + test_value / self.max_rate / 2), + latency=self.enable_latency) + traffic_generator.client.reset(ports=self.ports) + traffic_generator.client.clear_stats(ports=self.ports) + traffic_generator.client.remove_all_streams(ports=self.ports) + for port, profile in self.profiles.items(): + traffic_generator.client.add_streams(profile, ports=[port]) + + loss_ratio = (float(traffic_generator.loss) / float( + traffic_generator.sent)) * 100 + + samples = traffic_generator.generate_samples(stats, self.ports, + self.port_pg_id, + self.enable_latency) + samples.update(test_data) + LOGGING.info("Collect TG KPIs %s %s %s", datetime.datetime.now(), + test_value, samples) + self.queue.put(samples) + + if float(loss_ratio) > float(tolerance_value): + LOGGING.debug("Failure... Decreasing upper bound") + self.current_upper = test_value + else: + LOGGING.debug("Success! Increasing lower bound") + self.current_lower = test_value + + rate_total = float(traffic_generator.sent) / float(duration) + bandwidth_total = float(rate_total) * ( + float(self.pkt_size) + 20) * 8 / (10 ** 9) + + success_samples = {'Result_' + key: value for key, value in + samples.items()} + success_samples["Result_{}".format('PDR')] = { + "rate_total_pps": float(rate_total), + "bandwidth_total_Gbps": float(bandwidth_total), + "packet_loss_ratio": float(loss_ratio), + "packets_lost": int(traffic_generator.loss), + } + self.queue.put(success_samples) + + # Store Actual throughput for result samples + for intf in traffic_generator.vnfd_helper.interfaces: + name = intf["name"] + result_samples[name] = { + "Result_Actual_throughput": float( + success_samples["Result_{}".format(name)][ + "rx_throughput_bps"]), + } + + for intf in traffic_generator.vnfd_helper.interfaces: + name = intf["name"] + if theor_max_thruput < samples[name]["tx_throughput_bps"]: + theor_max_thruput = samples[name]['tx_throughput_bps'] + self.queue.put({'theor_max_throughput': theor_max_thruput}) + + result_samples["Result_theor_max_throughput"] = theor_max_thruput + self.queue.put(result_samples) + return result_samples diff --git a/yardstick/network_services/vnf_generic/vnf/acl_vnf.py b/yardstick/network_services/vnf_generic/vnf/acl_vnf.py index 11a602472..69d29bf76 100644 --- a/yardstick/network_services/vnf_generic/vnf/acl_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/acl_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -246,9 +246,12 @@ class AclApproxVnf(SampleVNF): 'packets_dropped': 2, } - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = AclApproxSetupEnvSetupEnvHelper - super(AclApproxVnf, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(AclApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) + + def wait_for_instantiate(self): + """Wait for VNF to initialize""" + self.wait_for_initialize() diff --git a/yardstick/network_services/vnf_generic/vnf/agnostic_vnf.py b/yardstick/network_services/vnf_generic/vnf/agnostic_vnf.py index 115fddcf0..d1d9667db 100644 --- a/yardstick/network_services/vnf_generic/vnf/agnostic_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/agnostic_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,8 +21,8 @@ LOG = logging.getLogger(__name__) class AgnosticVnf(base.GenericVNF): """ AgnosticVnf implementation. """ - def __init__(self, name, vnfd, task_id): - super(AgnosticVnf, self).__init__(name, vnfd, task_id) + def __init__(self, name, vnfd): + super(AgnosticVnf, self).__init__(name, vnfd) def instantiate(self, scenario_cfg, context_cfg): pass diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py index 0fb310075..8ef96b744 100644 --- a/yardstick/network_services/vnf_generic/vnf/base.py +++ b/yardstick/network_services/vnf_generic/vnf/base.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,10 +17,6 @@ import abc import logging import six -from yardstick.common import messaging -from yardstick.common.messaging import consumer -from yardstick.common.messaging import payloads -from yardstick.common.messaging import producer from yardstick.network_services.helpers.samplevnf_helper import PortPairs @@ -98,7 +94,7 @@ class VnfdHelper(dict): for interface in self.interfaces: virtual_intf = interface["virtual-interface"] if virtual_intf[key] == value: - return interface + return virtual_intf raise KeyError() def find_interface(self, **kwargs): @@ -141,70 +137,6 @@ class VnfdHelper(dict): yield port_name, port_num -class TrafficGeneratorProducer(producer.MessagingProducer): - """Class implementing the message producer for traffic generators - - This message producer must be instantiated in the process created - "run_traffic" process. - """ - def __init__(self, _id): - super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG, - _id=_id) - - def tg_method_started(self, version=1): - """Send a message to inform the traffic generation has started""" - self.send_message( - messaging.TG_METHOD_STARTED, - payloads.TrafficGeneratorPayload(version=version, iteration=0, - kpi={})) - - def tg_method_finished(self, version=1): - """Send a message to inform the traffic generation has finished""" - self.send_message( - messaging.TG_METHOD_FINISHED, - payloads.TrafficGeneratorPayload(version=version, iteration=0, - kpi={})) - - def tg_method_iteration(self, iteration, version=1, kpi=None): - """Send a message, with KPI, once an iteration has finished""" - kpi = {} if kpi is None else kpi - self.send_message( - messaging.TG_METHOD_ITERATION, - payloads.TrafficGeneratorPayload(version=version, - iteration=iteration, kpi=kpi)) - - -@six.add_metaclass(abc.ABCMeta) -class GenericVNFEndpoint(consumer.NotificationHandler): - """Endpoint class for ``GenericVNFConsumer``""" - - @abc.abstractmethod - def runner_method_start_iteration(self, ctxt, **kwargs): - """Endpoint when RUNNER_METHOD_START_ITERATION is received - - :param ctxt: (dict) {'id': <Producer ID>} - :param kwargs: (dict) ``payloads.RunnerPayload`` context - """ - - @abc.abstractmethod - def runner_method_stop_iteration(self, ctxt, **kwargs): - """Endpoint when RUNNER_METHOD_STOP_ITERATION is received - - :param ctxt: (dict) {'id': <Producer ID>} - :param kwargs: (dict) ``payloads.RunnerPayload`` context - """ - - -class GenericVNFConsumer(consumer.MessagingConsumer): - """MQ consumer for ``GenericVNF`` derived classes""" - - def __init__(self, ctx_ids, endpoints): - if not isinstance(endpoints, list): - endpoints = [endpoints] - super(GenericVNFConsumer, self).__init__(messaging.TOPIC_RUNNER, - ctx_ids, endpoints) - - @six.add_metaclass(abc.ABCMeta) class GenericVNF(object): """Class providing file-like API for generic VNF implementation @@ -217,9 +149,8 @@ class GenericVNF(object): UPLINK = PortPairs.UPLINK DOWNLINK = PortPairs.DOWNLINK - def __init__(self, name, vnfd, task_id): + def __init__(self, name, vnfd): self.name = name - self._task_id = task_id self.vnfd_helper = VnfdHelper(vnfd) # List of statistics we can obtain from this VNF # - ETSI MANO 6.3.1.1 monitoring_parameter @@ -280,11 +211,10 @@ class GenericVNF(object): class GenericTrafficGen(GenericVNF): """Class providing file-like API for generic traffic generator""" - def __init__(self, name, vnfd, task_id): - super(GenericTrafficGen, self).__init__(name, vnfd, task_id) + def __init__(self, name, vnfd): + super(GenericTrafficGen, self).__init__(name, vnfd) self.runs_traffic = True self.traffic_finished = False - self._mq_producer = None @abc.abstractmethod def run_traffic(self, traffic_profile): @@ -355,16 +285,3 @@ class GenericTrafficGen(GenericVNF): :return: True/False """ pass - - @staticmethod - def _setup_mq_producer(id): - """Setup the TG MQ producer to send messages between processes - - :return: (derived class from ``MessagingProducer``) MQ producer object - """ - return TrafficGeneratorProducer(id) - - def get_mq_producer_id(self): - """Return the MQ producer ID if initialized""" - if self._mq_producer: - return self._mq_producer.id diff --git a/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py b/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py index 14f1e2e97..ee4a581b1 100644 --- a/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -85,12 +85,12 @@ class CgnaptApproxVnf(SampleVNF): "packets_dropped": 4, } - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = CgnaptApproxSetupEnvHelper - super(CgnaptApproxVnf, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(CgnaptApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) def _vnf_up_post(self): super(CgnaptApproxVnf, self)._vnf_up_post() @@ -120,3 +120,7 @@ class CgnaptApproxVnf(SampleVNF): self.vnf_execute(cmd) time.sleep(WAIT_FOR_STATIC_NAPT) + + def wait_for_instantiate(self): + """Wait for VNF to initialize""" + self.wait_for_initialize() diff --git a/yardstick/network_services/vnf_generic/vnf/epc_vnf.py b/yardstick/network_services/vnf_generic/vnf/epc_vnf.py index 66d16d07f..8112963e9 100644 --- a/yardstick/network_services/vnf_generic/vnf/epc_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/epc_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,8 +21,8 @@ LOG = logging.getLogger(__name__) class EPCVnf(base.GenericVNF): - def __init__(self, name, vnfd, task_id): - super(EPCVnf, self).__init__(name, vnfd, task_id) + def __init__(self, name, vnfd): + super(EPCVnf, self).__init__(name, vnfd) def instantiate(self, scenario_cfg, context_cfg): """Prepare VNF for operation and start the VNF process/VM diff --git a/yardstick/network_services/vnf_generic/vnf/ipsec_vnf.py b/yardstick/network_services/vnf_generic/vnf/ipsec_vnf.py new file mode 100644 index 000000000..1961ac1b1 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/ipsec_vnf.py @@ -0,0 +1,498 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re +import time +from collections import Counter +from enum import Enum + +from yardstick.benchmark.contexts.base import Context +from yardstick.common.process import check_if_process_failed +from yardstick.network_services import constants +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF +from yardstick.network_services.vnf_generic.vnf.vpp_helpers import \ + VppSetupEnvHelper, VppConfigGenerator + +LOG = logging.getLogger(__name__) + + +class CryptoAlg(Enum): + """Encryption algorithms.""" + AES_CBC_128 = ('aes-cbc-128', 'AES-CBC', 16) + AES_CBC_192 = ('aes-cbc-192', 'AES-CBC', 24) + AES_CBC_256 = ('aes-cbc-256', 'AES-CBC', 32) + AES_GCM_128 = ('aes-gcm-128', 'AES-GCM', 20) + + def __init__(self, alg_name, scapy_name, key_len): + self.alg_name = alg_name + self.scapy_name = scapy_name + self.key_len = key_len + + +class IntegAlg(Enum): + """Integrity algorithms.""" + SHA1_96 = ('sha1-96', 'HMAC-SHA1-96', 20) + SHA_256_128 = ('sha-256-128', 'SHA2-256-128', 32) + SHA_384_192 = ('sha-384-192', 'SHA2-384-192', 48) + SHA_512_256 = ('sha-512-256', 'SHA2-512-256', 64) + AES_GCM_128 = ('aes-gcm-128', 'AES-GCM', 20) + + def __init__(self, alg_name, scapy_name, key_len): + self.alg_name = alg_name + self.scapy_name = scapy_name + self.key_len = key_len + + +class VipsecApproxSetupEnvHelper(VppSetupEnvHelper): + DEFAULT_IPSEC_VNF_CFG = { + 'crypto_type': 'SW_cryptodev', + 'rxq': 1, + 'worker_config': '1C/1T', + 'worker_threads': 1, + } + + def __init__(self, vnfd_helper, ssh_helper, scenario_helper): + super(VipsecApproxSetupEnvHelper, self).__init__( + vnfd_helper, ssh_helper, scenario_helper) + + def _get_crypto_type(self): + vnf_cfg = self.scenario_helper.options.get('vnf_config', + self.DEFAULT_IPSEC_VNF_CFG) + return vnf_cfg.get('crypto_type', 'SW_cryptodev') + + def _get_crypto_algorithms(self): + vpp_cfg = self.scenario_helper.all_options.get('vpp_config', {}) + return vpp_cfg.get('crypto_algorithms', 'aes-gcm') + + def _get_n_tunnels(self): + vpp_cfg = self.scenario_helper.all_options.get('vpp_config', {}) + return vpp_cfg.get('tunnels', 1) + + def _get_n_connections(self): + try: + flow_cfg = self.scenario_helper.all_options['flow'] + return flow_cfg['count'] + except KeyError: + raise KeyError("Missing flow definition in scenario section" + + " of the task definition file") + + def _get_flow_src_start_ip(self): + node_name = self.find_encrypted_data_interface()["node_name"] + try: + flow_cfg = self.scenario_helper.all_options['flow'] + src_ips = flow_cfg['src_ip'] + dst_ips = flow_cfg['dst_ip'] + except KeyError: + raise KeyError("Missing flow definition in scenario section" + + " of the task definition file") + + for src, dst in zip(src_ips, dst_ips): + flow_src_start_ip, _ = src.split('-') + flow_dst_start_ip, _ = dst.split('-') + + if node_name == "vnf__0": + return flow_src_start_ip + elif node_name == "vnf__1": + return flow_dst_start_ip + + def _get_flow_dst_start_ip(self): + node_name = self.find_encrypted_data_interface()["node_name"] + try: + flow_cfg = self.scenario_helper.all_options['flow'] + src_ips = flow_cfg['src_ip'] + dst_ips = flow_cfg['dst_ip'] + except KeyError: + raise KeyError("Missing flow definition in scenario section" + + " of the task definition file") + + for src, dst in zip(src_ips, dst_ips): + flow_src_start_ip, _ = src.split('-') + flow_dst_start_ip, _ = dst.split('-') + + if node_name == "vnf__0": + return flow_dst_start_ip + elif node_name == "vnf__1": + return flow_src_start_ip + + def build_config(self): + vnf_cfg = self.scenario_helper.options.get('vnf_config', + self.DEFAULT_IPSEC_VNF_CFG) + rxq = vnf_cfg.get('rxq', 1) + phy_cores = vnf_cfg.get('worker_threads', 1) + # worker_config = vnf_cfg.get('worker_config', '1C/1T').split('/')[1].lower() + + vpp_cfg = self.create_startup_configuration_of_vpp() + self.add_worker_threads_and_rxqueues(vpp_cfg, phy_cores, rxq) + self.add_pci_devices(vpp_cfg) + + frame_size_cfg = self.scenario_helper.all_options.get('framesize', {}) + uplink_cfg = frame_size_cfg.get('uplink', {}) + downlink_cfg = frame_size_cfg.get('downlink', {}) + framesize = min(self.calculate_frame_size(uplink_cfg), + self.calculate_frame_size(downlink_cfg)) + if framesize < 1522: + vpp_cfg.add_dpdk_no_multi_seg() + + crypto_algorithms = self._get_crypto_algorithms() + if crypto_algorithms == 'aes-gcm': + self.add_dpdk_cryptodev(vpp_cfg, 'aesni_gcm', phy_cores) + elif crypto_algorithms == 'cbc-sha1': + self.add_dpdk_cryptodev(vpp_cfg, 'aesni_mb', phy_cores) + + vpp_cfg.add_dpdk_dev_default_rxd(2048) + vpp_cfg.add_dpdk_dev_default_txd(2048) + self.apply_config(vpp_cfg, True) + self.update_vpp_interface_data() + + def setup_vnf_environment(self): + resource = super(VipsecApproxSetupEnvHelper, + self).setup_vnf_environment() + + self.start_vpp_service() + # for QAT device DH895xCC, the number of VFs is required as 32 + if self._get_crypto_type() == 'HW_cryptodev': + sriov_numvfs = self.get_sriov_numvfs( + self.find_encrypted_data_interface()["vpci"]) + if sriov_numvfs != 32: + self.crypto_device_init( + self.find_encrypted_data_interface()["vpci"], 32) + + self._update_vnfd_helper(self.sys_cores.get_cpu_layout()) + self.update_vpp_interface_data() + self.iface_update_numa() + + return resource + + @staticmethod + def calculate_frame_size(frame_cfg): + if not frame_cfg: + return 64 + + imix_count = {size.upper().replace('B', ''): int(weight) + for size, weight in frame_cfg.items()} + imix_sum = sum(imix_count.values()) + if imix_sum <= 0: + return 64 + packets_total = sum([int(size) * weight + for size, weight in imix_count.items()]) + return packets_total / imix_sum + + def check_status(self): + ipsec_created = False + cmd = "vppctl show int" + _, stdout, _ = self.ssh_helper.execute(cmd) + entries = re.split(r"\n+", stdout) + tmp = [re.split(r"\s\s+", entry, 5) for entry in entries] + + for item in tmp: + if isinstance(item, list): + if item[0] and item[0] != 'local0': + if "ipsec" in item[0] and not ipsec_created: + ipsec_created = True + if len(item) > 2 and item[2] == 'down': + return False + return ipsec_created + + def get_vpp_statistics(self): + cmd = "vppctl show int {intf}" + result = {} + for interface in self.vnfd_helper.interfaces: + iface_name = self.get_value_by_interface_key( + interface["virtual-interface"]["ifname"], "vpp_name") + command = cmd.format(intf=iface_name) + _, stdout, _ = self.ssh_helper.execute(command) + result.update( + self.parser_vpp_stats(interface["virtual-interface"]["ifname"], + iface_name, stdout)) + self.ssh_helper.execute("vppctl clear interfaces") + return result + + @staticmethod + def parser_vpp_stats(interface, iface_name, stats): + packets_in = 0 + packets_fwd = 0 + packets_dropped = 0 + result = {} + + entries = re.split(r"\n+", stats) + tmp = [re.split(r"\s\s+", entry, 5) for entry in entries] + + for item in tmp: + if isinstance(item, list): + if item[0] == iface_name and len(item) >= 5: + if item[3] == 'rx packets': + packets_in = int(item[4]) + elif item[4] == 'rx packets': + packets_in = int(item[5]) + elif len(item) == 3: + if item[1] == 'tx packets': + packets_fwd = int(item[2]) + elif item[1] == 'drops' or item[1] == 'rx-miss': + packets_dropped = int(item[2]) + if packets_dropped == 0 and packets_in > 0 and packets_fwd > 0: + packets_dropped = abs(packets_fwd - packets_in) + + result[interface] = { + 'packets_in': packets_in, + 'packets_fwd': packets_fwd, + 'packets_dropped': packets_dropped, + } + + return result + + def create_ipsec_tunnels(self): + self.initialize_ipsec() + + # TODO generate the same key + crypto_algorithms = self._get_crypto_algorithms() + if crypto_algorithms == 'aes-gcm': + encr_alg = CryptoAlg.AES_GCM_128 + auth_alg = IntegAlg.AES_GCM_128 + encr_key = 'LNYZXMBQDKESNLREHJMS' + auth_key = 'SWGLDTYZSQKVBZZMPIEV' + elif crypto_algorithms == 'cbc-sha1': + encr_alg = CryptoAlg.AES_CBC_128 + auth_alg = IntegAlg.SHA1_96 + encr_key = 'IFEMSHYLCZIYFUTT' + auth_key = 'PEALEIPSCPTRHYJSDXLY' + + self.execute_script("enable_dpdk_traces.vat", json_out=False) + self.execute_script("enable_vhost_user_traces.vat", json_out=False) + self.execute_script("enable_memif_traces.vat", json_out=False) + + node_name = self.find_encrypted_data_interface()["node_name"] + n_tunnels = self._get_n_tunnels() + n_connections = self._get_n_connections() + flow_dst_start_ip = self._get_flow_dst_start_ip() + if node_name == "vnf__0": + self.vpp_create_ipsec_tunnels( + self.find_encrypted_data_interface()["local_ip"], + self.find_encrypted_data_interface()["peer_intf"]["local_ip"], + self.find_encrypted_data_interface()["ifname"], + n_tunnels, n_connections, encr_alg, encr_key, auth_alg, + auth_key, flow_dst_start_ip) + elif node_name == "vnf__1": + self.vpp_create_ipsec_tunnels( + self.find_encrypted_data_interface()["local_ip"], + self.find_encrypted_data_interface()["peer_intf"]["local_ip"], + self.find_encrypted_data_interface()["ifname"], + n_tunnels, n_connections, encr_alg, encr_key, auth_alg, + auth_key, flow_dst_start_ip, 20000, 10000) + + def find_raw_data_interface(self): + try: + return self.vnfd_helper.find_virtual_interface(vld_id="uplink_0") + except KeyError: + return self.vnfd_helper.find_virtual_interface(vld_id="downlink_0") + + def find_encrypted_data_interface(self): + return self.vnfd_helper.find_virtual_interface(vld_id="ciphertext") + + def create_startup_configuration_of_vpp(self): + vpp_config_generator = VppConfigGenerator() + vpp_config_generator.add_unix_log() + vpp_config_generator.add_unix_cli_listen() + vpp_config_generator.add_unix_nodaemon() + vpp_config_generator.add_unix_coredump() + vpp_config_generator.add_dpdk_socketmem('1024,1024') + vpp_config_generator.add_dpdk_no_tx_checksum_offload() + vpp_config_generator.add_dpdk_log_level('debug') + for interface in self.vnfd_helper.interfaces: + vpp_config_generator.add_dpdk_uio_driver( + interface["virtual-interface"]["driver"]) + vpp_config_generator.add_heapsize('4G') + # TODO Enable configuration depend on VPP version + vpp_config_generator.add_statseg_size('4G') + vpp_config_generator.add_plugin('disable', ['default']) + vpp_config_generator.add_plugin('enable', ['dpdk_plugin.so']) + vpp_config_generator.add_ip6_hash_buckets('2000000') + vpp_config_generator.add_ip6_heap_size('4G') + vpp_config_generator.add_ip_heap_size('4G') + return vpp_config_generator + + def add_worker_threads_and_rxqueues(self, vpp_cfg, phy_cores, + rx_queues=None): + thr_count_int = phy_cores + cpu_count_int = phy_cores + num_mbufs_int = 32768 + + numa_list = [] + + if_list = [self.find_encrypted_data_interface()["ifname"], + self.find_raw_data_interface()["ifname"]] + for if_key in if_list: + try: + numa_list.append( + self.get_value_by_interface_key(if_key, 'numa_node')) + except KeyError: + pass + numa_cnt_mc = Counter(numa_list).most_common() + + if numa_cnt_mc and numa_cnt_mc[0][0] is not None and \ + numa_cnt_mc[0][0] != -1: + numa = numa_cnt_mc[0][0] + elif len(numa_cnt_mc) > 1 and numa_cnt_mc[0][0] == -1: + numa = numa_cnt_mc[1][0] + else: + numa = 0 + + try: + smt_used = self.sys_cores.is_smt_enabled() + except KeyError: + smt_used = False + + cpu_main = self.sys_cores.cpu_list_per_node_str(numa, skip_cnt=1, + cpu_cnt=1) + cpu_wt = self.sys_cores.cpu_list_per_node_str(numa, skip_cnt=2, + cpu_cnt=cpu_count_int, + smt_used=smt_used) + + if smt_used: + thr_count_int = 2 * cpu_count_int + + if rx_queues is None: + rxq_count_int = int(thr_count_int / 2) + else: + rxq_count_int = rx_queues + + if rxq_count_int == 0: + rxq_count_int = 1 + + num_mbufs_int = num_mbufs_int * rxq_count_int + + vpp_cfg.add_cpu_main_core(cpu_main) + vpp_cfg.add_cpu_corelist_workers(cpu_wt) + vpp_cfg.add_dpdk_dev_default_rxq(rxq_count_int) + vpp_cfg.add_dpdk_num_mbufs(num_mbufs_int) + + def add_pci_devices(self, vpp_cfg): + pci_devs = [self.find_encrypted_data_interface()["vpci"], + self.find_raw_data_interface()["vpci"]] + vpp_cfg.add_dpdk_dev(*pci_devs) + + def add_dpdk_cryptodev(self, vpp_cfg, sw_pmd_type, count): + crypto_type = self._get_crypto_type() + smt_used = self.sys_cores.is_smt_enabled() + cryptodev = self.find_encrypted_data_interface()["vpci"] + socket_id = self.get_value_by_interface_key( + self.find_encrypted_data_interface()["ifname"], "numa_node") + + if smt_used: + thr_count_int = count * 2 + if crypto_type == 'HW_cryptodev': + vpp_cfg.add_dpdk_cryptodev(thr_count_int, cryptodev) + else: + vpp_cfg.add_dpdk_sw_cryptodev(sw_pmd_type, socket_id, + thr_count_int) + else: + thr_count_int = count + if crypto_type == 'HW_cryptodev': + vpp_cfg.add_dpdk_cryptodev(thr_count_int, cryptodev) + else: + vpp_cfg.add_dpdk_sw_cryptodev(sw_pmd_type, socket_id, + thr_count_int) + + def initialize_ipsec(self): + flow_src_start_ip = self._get_flow_src_start_ip() + + self.set_interface_state( + self.find_encrypted_data_interface()["ifname"], 'up') + self.set_interface_state(self.find_raw_data_interface()["ifname"], + 'up') + self.vpp_interfaces_ready_wait() + self.vpp_set_interface_mtu( + self.find_encrypted_data_interface()["ifname"]) + self.vpp_set_interface_mtu(self.find_raw_data_interface()["ifname"]) + self.vpp_interfaces_ready_wait() + + self.set_ip(self.find_encrypted_data_interface()["ifname"], + self.find_encrypted_data_interface()["local_ip"], 24) + self.set_ip(self.find_raw_data_interface()["ifname"], + self.find_raw_data_interface()["local_ip"], + 24) + + self.add_arp_on_dut(self.find_encrypted_data_interface()["ifname"], + self.find_encrypted_data_interface()["peer_intf"][ + "local_ip"], + self.find_encrypted_data_interface()["peer_intf"][ + "local_mac"]) + self.add_arp_on_dut(self.find_raw_data_interface()["ifname"], + self.find_raw_data_interface()["peer_intf"][ + "local_ip"], + self.find_raw_data_interface()["peer_intf"][ + "local_mac"]) + + self.vpp_route_add(flow_src_start_ip, 8, + self.find_raw_data_interface()["peer_intf"][ + "local_ip"], + self.find_raw_data_interface()["ifname"]) + + +class VipsecApproxVnf(SampleVNF): + """ This class handles vIPSEC VNF model-driver definitions """ + + APP_NAME = 'vIPSEC' + APP_WORD = 'vipsec' + WAIT_TIME = 20 + + def __init__(self, name, vnfd, setup_env_helper_type=None, + resource_helper_type=None): + if setup_env_helper_type is None: + setup_env_helper_type = VipsecApproxSetupEnvHelper + super(VipsecApproxVnf, self).__init__( + name, vnfd, setup_env_helper_type, + resource_helper_type) + + def _run(self): + # we can't share ssh paramiko objects to force new connection + self.ssh_helper.drop_connection() + # kill before starting + self.setup_helper.kill_vnf() + self._build_config() + self.setup_helper.create_ipsec_tunnels() + + def wait_for_instantiate(self): + time.sleep(self.WAIT_TIME) + while True: + status = self.setup_helper.check_status() + if not self._vnf_process.is_alive() and not status: + raise RuntimeError("%s VNF process died." % self.APP_NAME) + LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME) + time.sleep(self.WAIT_TIME_FOR_SCRIPT) + status = self.setup_helper.check_status() + if status: + LOG.info("%s VNF is up and running.", self.APP_NAME) + self._vnf_up_post() + return self._vnf_process.exitcode + + def terminate(self): + self.setup_helper.kill_vnf() + self._tear_down() + self.resource_helper.stop_collect() + if self._vnf_process is not None: + # be proper and join first before we kill + LOG.debug("joining before terminate %s", self._vnf_process.name) + self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT) + self._vnf_process.terminate() + + def collect_kpi(self): + # we can't get KPIs if the VNF is down + check_if_process_failed(self._vnf_process, 0.01) + physical_node = Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + result = {"physical_node": physical_node} + result["collect_stats"] = self.setup_helper.get_vpp_statistics() + LOG.debug("%s collect KPIs %s", self.APP_NAME, result) + return result diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py index cd3035ef8..3507315f2 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -347,7 +347,7 @@ class ProxSocketHelper(object): LOG.debug("Received data from socket: [%s]", ret_str) return status, ret_str - def get_data(self, pkt_dump_only=False, timeout=0.01): + def get_data(self, pkt_dump_only=False, timeout=10.0): """ read data from the socket """ # This method behaves slightly differently depending on whether it is @@ -1146,7 +1146,7 @@ class ProxResourceHelper(ClientResourceHelper): self._test_type = self.setup_helper.find_in_section('global', 'name', None) return self._test_type - def run_traffic(self, traffic_profile, *args): + def run_traffic(self, traffic_profile): self._queue.cancel_join_thread() self.lower = 0.0 self.upper = 100.0 diff --git a/yardstick/network_services/vnf_generic/vnf/prox_irq.py b/yardstick/network_services/vnf_generic/vnf/prox_irq.py index dda26b0fe..614066e46 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_irq.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_irq.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -28,13 +28,13 @@ LOG = logging.getLogger(__name__) class ProxIrq(SampleVNFTrafficGen): - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): vnfd_cpy = copy.deepcopy(vnfd) - super(ProxIrq, self).__init__(name, vnfd_cpy, task_id) + super(ProxIrq, self).__init__(name, vnfd_cpy) self._vnf_wrapper = ProxApproxVnf( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + name, vnfd, setup_env_helper_type, resource_helper_type) self.bin_path = get_nsb_option('bin_path', '') self.name = self._vnf_wrapper.name self.ssh_helper = self._vnf_wrapper.ssh_helper @@ -83,9 +83,9 @@ class ProxIrqVNF(ProxIrq, SampleVNFTrafficGen): APP_NAME = 'ProxIrqVNF' - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): - ProxIrq.__init__(self, name, vnfd, task_id, setup_env_helper_type, + ProxIrq.__init__(self, name, vnfd, setup_env_helper_type, resource_helper_type) self.start_test_time = None @@ -150,9 +150,9 @@ class ProxIrqGen(ProxIrq, SampleVNFTrafficGen): APP_NAME = 'ProxIrqGen' - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): - ProxIrq.__init__(self, name, vnfd, task_id, setup_env_helper_type, + ProxIrq.__init__(self, name, vnfd, setup_env_helper_type, resource_helper_type) self.start_test_time = None self.end_test_time = None diff --git a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py index c3b50369b..c9abc757e 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -35,8 +35,7 @@ class ProxApproxVnf(SampleVNF): VNF_PROMPT = "PROX started" LUA_PARAMETER_NAME = "sut" - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = ProxDpdkVnfSetupEnvHelper @@ -47,8 +46,8 @@ class ProxApproxVnf(SampleVNF): self.prev_packets_sent = 0 self.prev_tsc = 0 self.tsc_hz = 0 - super(ProxApproxVnf, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + super(ProxApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) def _vnf_up_post(self): self.resource_helper.up_post() diff --git a/yardstick/network_services/vnf_generic/vnf/router_vnf.py b/yardstick/network_services/vnf_generic/vnf/router_vnf.py index e99de9cb3..f1486bdb4 100644 --- a/yardstick/network_services/vnf_generic/vnf/router_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/router_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -34,8 +34,7 @@ class RouterVNF(SampleVNF): WAIT_TIME = 1 - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = DpdkVnfSetupEnvHelper @@ -43,8 +42,7 @@ class RouterVNF(SampleVNF): vnfd['mgmt-interface'].pop("pkey", "") vnfd['mgmt-interface']['password'] = 'password' - super(RouterVNF, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + super(RouterVNF, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) def instantiate(self, scenario_cfg, context_cfg): self.scenario_helper.scenario_cfg = scenario_cfg diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 8833b88f2..a369a3ae6 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2018 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,11 +14,10 @@ import logging import decimal -from multiprocessing import Queue, Value, Process +from multiprocessing import Queue, Value, Process, JoinableQueue import os import posixpath import re -import uuid import subprocess import time @@ -232,6 +231,9 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): exit_status = self.dpdk_bind_helper.check_dpdk_driver() if exit_status == 0: return + else: + LOG.critical("DPDK Driver not installed") + return def _setup_resources(self): # what is this magic? how do we know which socket is for which port? @@ -397,26 +399,26 @@ class ClientResourceHelper(ResourceHelper): time.sleep(self.QUEUE_WAIT_TIME) self._queue.put(samples) - def run_traffic(self, traffic_profile, mq_producer): + def run_traffic(self, traffic_profile): # if we don't do this we can hang waiting for the queue to drain # have to do this in the subprocess self._queue.cancel_join_thread() # fixme: fix passing correct trex config file, # instead of searching the default path - mq_producer.tg_method_started() try: self._build_ports() self.client = self._connect() + if self.client is None: + LOG.critical("Failure to Connect ... unable to continue") + return + self.client.reset(ports=self.all_ports) self.client.remove_all_streams(self.all_ports) # remove all streams traffic_profile.register_generator(self) - iteration_index = 0 while self._terminated.value == 0: - iteration_index += 1 if self._run_traffic_once(traffic_profile): self._terminated.value = 1 - mq_producer.tg_method_iteration(iteration_index) self.client.stop(self.all_ports) self.client.disconnect() @@ -427,8 +429,6 @@ class ClientResourceHelper(ResourceHelper): return # return if trex/tg server is stopped. raise - mq_producer.tg_method_finished() - def terminate(self): self._terminated.value = 1 # stop client @@ -461,22 +461,35 @@ class ClientResourceHelper(ResourceHelper): server=self.vnfd_helper.mgmt_interface["ip"], verbose_level=LoggerApi.VERBOSE_QUIET) - # try to connect with 5s intervals, 30s max + # try to connect with 5s intervals for idx in range(6): try: client.connect() - break + for idx2 in range(6): + if client.is_connected(): + return client + LOG.info("Waiting to confirm connection %s .. Attempt %s", + idx, idx2) + time.sleep(1) + client.disconnect(stop_traffic=True, release_ports=True) except STLError: LOG.info("Unable to connect to Trex Server.. Attempt %s", idx) time.sleep(5) - return client + if client.is_connected(): + return client + else: + LOG.critical("Connection failure ..TRex username: %s server: %s", + self.vnfd_helper.mgmt_interface["user"], + self.vnfd_helper.mgmt_interface["ip"]) + return None class Rfc2544ResourceHelper(object): DEFAULT_CORRELATED_TRAFFIC = False DEFAULT_LATENCY = False DEFAULT_TOLERANCE = '0.0001 - 0.0001' + DEFAULT_RESOLUTION = '0.1' def __init__(self, scenario_helper): super(Rfc2544ResourceHelper, self).__init__() @@ -488,6 +501,7 @@ class Rfc2544ResourceHelper(object): self._tolerance_low = None self._tolerance_high = None self._tolerance_precision = None + self._resolution = None @property def rfc2544(self): @@ -527,6 +541,13 @@ class Rfc2544ResourceHelper(object): self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY) return self._latency + @property + def resolution(self): + if self._resolution is None: + self._resolution = float(self.get_rfc2544('resolution', + self.DEFAULT_RESOLUTION)) + return self._resolution + def get_rfc2544(self, name, default=None): return self.rfc2544.get(name, default) @@ -619,7 +640,6 @@ class ScenarioHelper(object): test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT) return test_duration if test_duration > test_timeout else test_timeout - class SampleVNF(GenericVNF): """ Class providing file-like API for generic VNF implementation """ @@ -629,9 +649,8 @@ class SampleVNF(GenericVNF): APP_NAME = "SampleVNF" # we run the VNF interactively, so the ssh command will timeout after this long - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): - super(SampleVNF, self).__init__(name, vnfd, task_id) + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + super(SampleVNF, self).__init__(name, vnfd) self.bin_path = get_nsb_option('bin_path', '') self.scenario_helper = ScenarioHelper(self.name) @@ -700,8 +719,8 @@ class SampleVNF(GenericVNF): scenarios: - type: NSPerf nodes: - tg__0: trafficgen_1.yardstick - vnf__0: vnf.yardstick + tg__0: trafficgen_0.yardstick + vnf__0: vnf_0.yardstick options: collectd: <options> # COLLECTD priority 3 @@ -764,6 +783,53 @@ class SampleVNF(GenericVNF): # by other VNF output self.q_in.put('\r\n') + def wait_for_initialize(self): + buf = [] + vnf_prompt_found = False + prompt_command = '\r\n' + script_name = 'non_existent_script_name' + done_string = 'Cannot open file "{}"'.format(script_name) + time.sleep(self.WAIT_TIME) # Give some time for config to load + while True: + if not self._vnf_process.is_alive(): + raise RuntimeError("%s VNF process died." % self.APP_NAME) + while self.q_out.qsize() > 0: + buf.append(self.q_out.get()) + message = ''.join(buf) + + if self.VNF_PROMPT in message and not vnf_prompt_found: + # Once we got VNF promt, it doesn't mean that the VNF is + # up and running/initialized completely. But we can run + # addition (any) VNF command and wait for it to complete + # as it will be finished ONLY at the end of the VNF + # initialization. So, this approach can be used to + # indentify that VNF is completely initialized. + LOG.info("Got %s VNF prompt.", self.APP_NAME) + prompt_command = "run {}\r\n".format(script_name) + self.q_in.put(prompt_command) + # Cut the buffer since we are not interesting to find + # the VNF prompt anymore + prompt_pos = message.find(self.VNF_PROMPT) + buf = [message[prompt_pos + len(self.VNF_PROMPT):]] + vnf_prompt_found = True + continue + + if done_string in message: + LOG.info("%s VNF is up and running.", self.APP_NAME) + self._vnf_up_post() + self.queue_wrapper.clear() + return self._vnf_process.exitcode + + if "PANIC" in message: + raise RuntimeError("Error starting %s VNF." % + self.APP_NAME) + + LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME) + time.sleep(self.WAIT_TIME_FOR_SCRIPT) + # Send command again to display the expected prompt in case the + # expected text was corrupted by other VNF output + self.q_in.put(prompt_command) + def start_collect(self): self.resource_helper.start_collect() @@ -862,9 +928,8 @@ class SampleVNFTrafficGen(GenericTrafficGen): APP_NAME = 'Sample' RUN_WAIT = 1 - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): - super(SampleVNFTrafficGen, self).__init__(name, vnfd, task_id) + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + super(SampleVNFTrafficGen, self).__init__(name, vnfd) self.bin_path = get_nsb_option('bin_path', '') self.scenario_helper = ScenarioHelper(self.name) @@ -886,6 +951,39 @@ class SampleVNFTrafficGen(GenericTrafficGen): self.traffic_finished = False self._tg_process = None self._traffic_process = None + self._tasks_queue = JoinableQueue() + self._result_queue = Queue() + + def _test_runner(self, traffic_profile, tasks, results): + self.resource_helper.run_test(traffic_profile, tasks, results) + + def _init_traffic_process(self, traffic_profile): + name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME, + traffic_profile.__class__.__name__, + os.getpid()) + self._traffic_process = Process(name=name, target=self._test_runner, + args=( + traffic_profile, self._tasks_queue, + self._result_queue)) + + self._traffic_process.start() + while self.resource_helper.client_started.value == 0: + time.sleep(1) + if not self._traffic_process.is_alive(): + break + + def run_traffic_once(self, traffic_profile): + if self.resource_helper.client_started.value == 0: + self._init_traffic_process(traffic_profile) + + # continue test - run next iteration + LOG.info("Run next iteration ...") + self._tasks_queue.put('RUN_TRAFFIC') + + def wait_on_traffic(self): + self._tasks_queue.join() + result = self._result_queue.get() + return result def _start_server(self): # we can't share ssh paramiko objects to force new connection @@ -923,13 +1021,12 @@ class SampleVNFTrafficGen(GenericTrafficGen): LOG.info("%s TG Server is up and running.", self.APP_NAME) return self._tg_process.exitcode - def _traffic_runner(self, traffic_profile, mq_id): + def _traffic_runner(self, traffic_profile): # always drop connections first thing in new processes # so we don't get paramiko errors self.ssh_helper.drop_connection() LOG.info("Starting %s client...", self.APP_NAME) - self._mq_producer = self._setup_mq_producer(mq_id) - self.resource_helper.run_traffic(traffic_profile, self._mq_producer) + self.resource_helper.run_traffic(traffic_profile) def run_traffic(self, traffic_profile): """ Generate traffic on the wire according to the given params. @@ -939,12 +1036,10 @@ class SampleVNFTrafficGen(GenericTrafficGen): :param traffic_profile: :return: True/False """ - name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME, - traffic_profile.__class__.__name__, + name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__, os.getpid()) - self._traffic_process = Process( - name=name, target=self._traffic_runner, - args=(traffic_profile, uuid.uuid1().int)) + self._traffic_process = Process(name=name, target=self._traffic_runner, + args=(traffic_profile,)) self._traffic_process.start() # Wait for traffic process to start while self.resource_helper.client_started.value == 0: @@ -953,6 +1048,8 @@ class SampleVNFTrafficGen(GenericTrafficGen): if not self._traffic_process.is_alive(): break + return self._traffic_process.is_alive() + def collect_kpi(self): # check if the tg processes have exited physical_node = Context.get_physical_node_from_server( diff --git a/yardstick/network_services/vnf_generic/vnf/tg_imsbench_sipp.py b/yardstick/network_services/vnf_generic/vnf/tg_imsbench_sipp.py new file mode 100644 index 000000000..70557b848 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/tg_imsbench_sipp.py @@ -0,0 +1,143 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from collections import deque + +from yardstick.network_services.vnf_generic.vnf import sample_vnf + +LOG = logging.getLogger(__name__) + + +class SippSetupEnvHelper(sample_vnf.SetupEnvHelper): + APP_NAME = "ImsbenchSipp" + + +class SippResourceHelper(sample_vnf.ClientResourceHelper): + pass + + +class SippVnf(sample_vnf.SampleVNFTrafficGen): + """ + This class calls the test script from TG machine, then gets the result file + from IMS machine. After that, the result file is handled line by line, and + is updated to database. + """ + + APP_NAME = "ImsbenchSipp" + APP_WORD = "ImsbenchSipp" + VNF_TYPE = "ImsbenchSipp" + HW_OFFLOADING_NFVI_TYPES = {'baremetal', 'sriov'} + RESULT = "/tmp/final_result.dat" + SIPP_RESULT = "/tmp/sipp_dat_files/final_result.dat" + LOCAL_PATH = "/tmp" + CMD = "./SIPp_benchmark.bash {} {} {} '{}'" + + def __init__(self, name, vnfd, setup_env_helper_type=None, + resource_helper_type=None): + if resource_helper_type is None: + resource_helper_type = SippResourceHelper + if setup_env_helper_type is None: + setup_env_helper_type = SippSetupEnvHelper + super(SippVnf, self).__init__( + name, vnfd, setup_env_helper_type, resource_helper_type) + self.params = "" + self.pcscf_ip = self.vnfd_helper.interfaces[0]["virtual-interface"]\ + ["peer_intf"]["local_ip"] + self.sipp_ip = self.vnfd_helper.interfaces[0]["virtual-interface"]\ + ["local_ip"] + self.media_ip = self.vnfd_helper.interfaces[1]["virtual-interface"]\ + ["local_ip"] + self.queue = "" + self.count = 0 + + def instantiate(self, scenario_cfg, context_cfg): + super(SippVnf, self).instantiate(scenario_cfg, context_cfg) + scenario_cfg = {} + _params = [("port", 5060), ("start_user", 1), ("end_user", 10000), + ("init_reg_cps", 50), ("init_reg_max", 5000), ("reg_cps", 50), + ("reg_step", 10), ("rereg_cps", 10), ("rereg_step", 5), + ("dereg_cps", 10), ("dereg_step", 5), ("msgc_cps", 10), + ("msgc_step", 2), ("run_mode", "rtp"), ("call_cps", 10), + ("hold_time", 15), ("call_step", 5)] + + self.params = ';'.join([str(scenario_cfg.get("options", {}).get(k, v)) + for k, v in dict(_params).items()]) + + def wait_for_instantiate(self): + pass + + def get_result_files(self): + self.ssh_helper.get(self.SIPP_RESULT, self.LOCAL_PATH, True) + + # Example of result file: + # cat /tmp/final_result.dat + # timestamp:1000 reg:100 reg_saps:0 + # timestamp:2000 reg:100 reg_saps:50 + # timestamp:3000 reg:100 reg_saps:50 + # timestamp:4000 reg:100 reg_saps:50 + # ... + # reg_Requested_prereg:50 + # reg_Effective_prereg:49.49 + # reg_DOC:0 + # ... + @staticmethod + def handle_result_files(filename): + with open(filename, 'r') as f: + content = f.readlines() + result = [{k: round(float(v), 2) for k, v in [i.split(":", 1) for i in x.split()]} + for x in content if x] + return deque(result) + + def run_traffic(self, traffic_profile): + traffic_profile.execute_traffic(self) + cmd = self.CMD.format(self.sipp_ip, self.media_ip, + self.pcscf_ip, self.params) + self.ssh_helper.execute(cmd, None, 3600, False) + self.get_result_files() + self.queue = self.handle_result_files(self.RESULT) + + def collect_kpi(self): + result = {} + try: + result = self.queue.popleft() + except IndexError: + pass + return result + + @staticmethod + def count_line_num(fname): + try: + with open(fname, 'r') as f: + return sum(1 for line in f) + except IOError: + return 0 + + def is_ended(self): + """ + The test will end when the results are pushed into database. + It does not depend on the "duration" value, so this value will be set + enough big to make sure that the test will end before duration. + """ + num_lines = self.count_line_num(self.RESULT) + if self.count == num_lines: + LOG.debug('TG IS ENDED.....................') + self.count = 0 + return True + self.count += 1 + return False + + def terminate(self): + LOG.debug('TERMINATE:.....................') + self.resource_helper.terminate() diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ixload.py b/yardstick/network_services/vnf_generic/vnf/tg_ixload.py index d25402740..38b00a4b2 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_ixload.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_ixload.py @@ -126,13 +126,12 @@ class IxLoadResourceHelper(sample_vnf.ClientResourceHelper): class IxLoadTrafficGen(sample_vnf.SampleVNFTrafficGen): - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if resource_helper_type is None: resource_helper_type = IxLoadResourceHelper - super(IxLoadTrafficGen, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + super(IxLoadTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) self._result = {} def update_gateways(self, links): @@ -143,7 +142,12 @@ class IxLoadTrafficGen(sample_vnf.SampleVNFTrafficGen): "external-interface"] if intf["virtual-interface"]["vld_id"] == name) - links[name]["ip"]["gateway"] = gateway + try: + links[name]["ip"]["gateway"] = gateway + except KeyError: + LOG.error("Invalid traffic profile: No IP section defined for %s", name) + raise + except StopIteration: LOG.debug("Cant find gateway for link %s", name) links[name]["ip"]["gateway"] = "0.0.0.0" diff --git a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py index 2fba89b22..285374a92 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -35,11 +35,11 @@ LOG = logging.getLogger(__name__) class LandslideTrafficGen(sample_vnf.SampleVNFTrafficGen): APP_NAME = 'LandslideTG' - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if resource_helper_type is None: resource_helper_type = LandslideResourceHelper - super(LandslideTrafficGen, self).__init__(name, vnfd, task_id, + super(LandslideTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ping.py b/yardstick/network_services/vnf_generic/vnf/tg_ping.py index a3b5afa39..5c8819119 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_ping.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_ping.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -71,7 +71,7 @@ class PingResourceHelper(ClientResourceHelper): self._queue = Queue() self._parser = PingParser(self._queue) - def run_traffic(self, traffic_profile, *args): + def run_traffic(self, traffic_profile): # drop the connection in order to force a new one self.ssh_helper.drop_connection() @@ -103,14 +103,14 @@ class PingTrafficGen(SampleVNFTrafficGen): APP_NAME = 'Ping' RUN_WAIT = 4 - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = PingSetupEnvHelper if resource_helper_type is None: resource_helper_type = PingResourceHelper - super(PingTrafficGen, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(PingTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) self._result = {} def _check_status(self): diff --git a/yardstick/network_services/vnf_generic/vnf/tg_pktgen.py b/yardstick/network_services/vnf_generic/vnf/tg_pktgen.py index 9d452213f..5da2178af 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_pktgen.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_pktgen.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,9 +13,7 @@ # limitations under the License. import logging -import multiprocessing import time -import uuid from yardstick.common import constants from yardstick.common import exceptions @@ -26,8 +24,7 @@ from yardstick.network_services.vnf_generic.vnf import base as vnf_base LOG = logging.getLogger(__name__) -class PktgenTrafficGen(vnf_base.GenericTrafficGen, - vnf_base.GenericVNFEndpoint): +class PktgenTrafficGen(vnf_base.GenericTrafficGen): """DPDK Pktgen traffic generator Website: http://pktgen-dpdk.readthedocs.io/en/latest/index.html @@ -35,15 +32,8 @@ class PktgenTrafficGen(vnf_base.GenericTrafficGen, TIMEOUT = 30 - def __init__(self, name, vnfd, task_id): - vnf_base.GenericTrafficGen.__init__(self, name, vnfd, task_id) - self.queue = multiprocessing.Queue() - self._id = uuid.uuid1().int - self._mq_producer = self._setup_mq_producer(self._id) - vnf_base.GenericVNFEndpoint.__init__(self, self._id, [task_id], - self.queue) - self._consumer = vnf_base.GenericVNFConsumer([task_id], self) - self._consumer.start_rpc_server() + def __init__(self, name, vnfd): + vnf_base.GenericTrafficGen.__init__(self, name, vnfd) self._traffic_profile = None self._node_ip = vnfd['mgmt-interface'].get('ip') self._lua_node_port = self._get_lua_node_port( @@ -71,7 +61,7 @@ class PktgenTrafficGen(vnf_base.GenericTrafficGen, def wait_for_instantiate(self): # pragma: no cover pass - def runner_method_start_iteration(self, ctxt, **kwargs): + def runner_method_start_iteration(self): # pragma: no cover LOG.debug('Start method') # NOTE(ralonsoh): 'rate' should be modified between iterations. The @@ -81,11 +71,6 @@ class PktgenTrafficGen(vnf_base.GenericTrafficGen, self._traffic_profile.rate(self._rate) time.sleep(4) self._traffic_profile.stop() - self._mq_producer.tg_method_iteration(1, 1, {}) - - def runner_method_stop_iteration(self, ctxt, **kwargs): # pragma: no cover - # pragma: no cover - LOG.debug('Stop method') @staticmethod def _get_lua_node_port(service_ports): diff --git a/yardstick/network_services/vnf_generic/vnf/tg_prox.py b/yardstick/network_services/vnf_generic/vnf/tg_prox.py index d12c42ec8..65b7bac10 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_prox.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_prox.py @@ -1,4 +1,4 @@ -# Copyright (c) 2017 Intel Corporation +# Copyright (c) 2017-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -29,13 +29,13 @@ class ProxTrafficGen(SampleVNFTrafficGen): LUA_PARAMETER_NAME = "gen" WAIT_TIME = 1 - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): vnfd_cpy = copy.deepcopy(vnfd) - super(ProxTrafficGen, self).__init__(name, vnfd_cpy, task_id) + super(ProxTrafficGen, self).__init__(name, vnfd_cpy) self._vnf_wrapper = ProxApproxVnf( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + name, vnfd, setup_env_helper_type, resource_helper_type) self.bin_path = get_nsb_option('bin_path', '') self.name = self._vnf_wrapper.name self.ssh_helper = self._vnf_wrapper.ssh_helper diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py index 4fbbf6a40..80812876d 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2018 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,7 +15,9 @@ import ipaddress import logging import six +import collections +from six import moves from yardstick.common import utils from yardstick.common import exceptions from yardstick.network_services.libs.ixia_libs.ixnet import ixnet_api @@ -28,7 +30,7 @@ LOG = logging.getLogger(__name__) WAIT_AFTER_CFG_LOAD = 10 WAIT_FOR_TRAFFIC = 30 -WAIT_PROTOCOLS_STARTED = 360 +WAIT_PROTOCOLS_STARTED = 420 class IxiaBasicScenario(object): @@ -52,13 +54,54 @@ class IxiaBasicScenario(object): def stop_protocols(self): pass - def create_traffic_model(self, traffic_profile=None): - # pylint: disable=unused-argument + def create_traffic_model(self, traffic_profile): vports = self.client.get_vports() self._uplink_vports = vports[::2] self._downlink_vports = vports[1::2] self.client.create_traffic_model(self._uplink_vports, - self._downlink_vports) + self._downlink_vports, + traffic_profile) + + def _get_stats(self): + return self.client.get_statistics() + + def generate_samples(self, resource_helper, ports, duration): + stats = self._get_stats() + + samples = {} + # this is not DPDK port num, but this is whatever number we gave + # when we selected ports and programmed the profile + for port_num in ports: + try: + # reverse lookup port name from port_num so the stats dict is descriptive + intf = resource_helper.vnfd_helper.find_interface_by_port(port_num) + port_name = intf['name'] + avg_latency = stats['Store-Forward_Avg_latency_ns'][port_num] + min_latency = stats['Store-Forward_Min_latency_ns'][port_num] + max_latency = stats['Store-Forward_Max_latency_ns'][port_num] + samples[port_name] = { + 'RxThroughputBps': float(stats['Bytes_Rx'][port_num]) / duration, + 'TxThroughputBps': float(stats['Bytes_Tx'][port_num]) / duration, + 'InPackets': int(stats['Valid_Frames_Rx'][port_num]), + 'OutPackets': int(stats['Frames_Tx'][port_num]), + 'InBytes': int(stats['Bytes_Rx'][port_num]), + 'OutBytes': int(stats['Bytes_Tx'][port_num]), + 'RxThroughput': float(stats['Valid_Frames_Rx'][port_num]) / duration, + 'TxThroughput': float(stats['Frames_Tx'][port_num]) / duration, + 'LatencyAvg': utils.safe_cast(avg_latency, int, 0), + 'LatencyMin': utils.safe_cast(min_latency, int, 0), + 'LatencyMax': utils.safe_cast(max_latency, int, 0) + } + except IndexError: + pass + + return samples + + def update_tracking_options(self): + pass + + def get_tc_rfc2544_options(self): + pass class IxiaL3Scenario(IxiaBasicScenario): @@ -122,8 +165,7 @@ class IxiaL3Scenario(IxiaBasicScenario): self._add_interfaces() self._add_static_ips() - def create_traffic_model(self, traffic_profile=None): - # pylint: disable=unused-argument + def create_traffic_model(self, traffic_profile): vports = self.client.get_vports() self._uplink_vports = vports[::2] self._downlink_vports = vports[1::2] @@ -134,7 +176,8 @@ class IxiaL3Scenario(IxiaBasicScenario): for port in self._downlink_vports] self.client.create_ipv4_traffic_model(uplink_endpoints, - downlink_endpoints) + downlink_endpoints, + traffic_profile) class IxiaPppoeClientScenario(object): @@ -166,10 +209,15 @@ class IxiaPppoeClientScenario(object): traffic_profile.full_profile) endpoints_obj_pairs = \ self._get_endpoints_src_dst_obj_pairs(endpoints_id_pairs) - uplink_endpoints = endpoints_obj_pairs[::2] - downlink_endpoints = endpoints_obj_pairs[1::2] + if endpoints_obj_pairs: + uplink_endpoints = endpoints_obj_pairs[::2] + downlink_endpoints = endpoints_obj_pairs[1::2] + else: + uplink_endpoints = self._access_topologies + downlink_endpoints = self._core_topologies self.client.create_ipv4_traffic_model(uplink_endpoints, - downlink_endpoints) + downlink_endpoints, + traffic_profile) def run_protocols(self): LOG.info('PPPoE Scenario - Start Protocols') @@ -260,18 +308,14 @@ class IxiaPppoeClientScenario(object): device groups pairs between which flow groups will be created: 1. In case uplink/downlink flows in traffic profile doesn't have - specified 'port' key, flows will be created between each device - group on access port and device group on corresponding core port. + specified 'port' key, flows will be created between topologies + on corresponding access and core port. E.g.: - Device groups created on access port xe0: dg1, dg2, dg3 - Device groups created on core port xe1: dg4 + Access topology on xe0: topology1 + Core topology on xe1: topology2 Flows will be created between: - dg1 -> dg4 - dg4 -> dg1 - dg2 -> dg4 - dg4 -> dg2 - dg3 -> dg4 - dg4 -> dg3 + topology1 -> topology2 + topology2 -> topology1 2. In case uplink/downlink flows in traffic profile have specified 'port' key, flows will be created between device groups on this @@ -332,13 +376,6 @@ class IxiaPppoeClientScenario(object): [endpoint_obj_pairs.extend([up, down]) for up, down in zip(uplink_dev_groups, downlink_dev_groups)] - if not endpoint_obj_pairs: - for up, down in zip(uplink_ports, downlink_ports): - uplink_dev_groups = port_to_dev_group_mapping[up] - downlink_dev_groups = \ - port_to_dev_group_mapping[down] * len(uplink_dev_groups) - [endpoint_obj_pairs.extend(list(i)) - for i in zip(uplink_dev_groups, downlink_dev_groups)] return endpoint_obj_pairs def _fill_ixia_config(self): @@ -432,6 +469,168 @@ class IxiaPppoeClientScenario(object): bgp_type=ipv4["bgp"].get("bgp_type")) self.protocols.append(bgp_peer_obj) + def update_tracking_options(self): + priority_map = { + 'raw': 'ipv4Raw0', + 'tos': {'precedence': 'ipv4Precedence0'}, + 'dscp': {'defaultPHB': 'ipv4DefaultPhb0', + 'selectorPHB': 'ipv4ClassSelectorPhb0', + 'assuredPHB': 'ipv4AssuredForwardingPhb0', + 'expeditedPHB': 'ipv4ExpeditedForwardingPhb0'} + } + + prio_trackby_key = 'ipv4Precedence0' + + try: + priority = list(self._ixia_cfg['priority'])[0] + if priority == 'raw': + prio_trackby_key = priority_map[priority] + elif priority in ['tos', 'dscp']: + priority_type = list(self._ixia_cfg['priority'][priority])[0] + prio_trackby_key = priority_map[priority][priority_type] + except KeyError: + pass + + tracking_options = ['flowGroup0', 'vlanVlanId0', prio_trackby_key] + self.client.set_flow_tracking(tracking_options) + + def get_tc_rfc2544_options(self): + return self._ixia_cfg.get('rfc2544') + + def _get_stats(self): + return self.client.get_pppoe_scenario_statistics() + + @staticmethod + def get_flow_id_data(stats, flow_id, key): + result = [float(flow.get(key)) for flow in stats if flow['id'] == flow_id] + return sum(result) / len(result) + + def get_priority_flows_stats(self, samples, duration): + results = {} + priorities = set([flow['IP_Priority'] for flow in samples]) + for priority in priorities: + tx_frames = sum( + [int(flow['Tx_Frames']) for flow in samples + if flow['IP_Priority'] == priority]) + rx_frames = sum( + [int(flow['Rx_Frames']) for flow in samples + if flow['IP_Priority'] == priority]) + prio_flows_num = len([flow for flow in samples + if flow['IP_Priority'] == priority]) + avg_latency_ns = sum( + [int(flow['Store-Forward_Avg_latency_ns']) for flow in samples + if flow['IP_Priority'] == priority]) / prio_flows_num + min_latency_ns = min( + [int(flow['Store-Forward_Min_latency_ns']) for flow in samples + if flow['IP_Priority'] == priority]) + max_latency_ns = max( + [int(flow['Store-Forward_Max_latency_ns']) for flow in samples + if flow['IP_Priority'] == priority]) + tx_throughput = float(tx_frames) / duration + rx_throughput = float(rx_frames) / duration + results[priority] = { + 'InPackets': rx_frames, + 'OutPackets': tx_frames, + 'RxThroughput': round(rx_throughput, 3), + 'TxThroughput': round(tx_throughput, 3), + 'LatencyAvg': utils.safe_cast(avg_latency_ns, int, 0), + 'LatencyMin': utils.safe_cast(min_latency_ns, int, 0), + 'LatencyMax': utils.safe_cast(max_latency_ns, int, 0) + } + return results + + def generate_samples(self, resource_helper, ports, duration): + + stats = self._get_stats() + samples = {} + ports_stats = stats['port_statistics'] + flows_stats = stats['flow_statistic'] + pppoe_subs_per_port = stats['pppox_client_per_port'] + + # Get sorted list of ixia ports names + ixia_port_names = sorted([data['port_name'] for data in ports_stats]) + + # Set 'port_id' key for ports stats items + for item in ports_stats: + port_id = item.pop('port_name').split('-')[-1].strip() + item['port_id'] = int(port_id) + + # Set 'id' key for flows stats items + for item in flows_stats: + flow_id = item.pop('Flow_Group').split('-')[1].strip() + item['id'] = int(flow_id) + + # Set 'port_id' key for pppoe subs per port stats + for item in pppoe_subs_per_port: + port_id = item.pop('subs_port').split('-')[-1].strip() + item['port_id'] = int(port_id) + + # Map traffic flows to ports + port_flow_map = collections.defaultdict(set) + for item in flows_stats: + tx_port = item.pop('Tx_Port') + tx_port_index = ixia_port_names.index(tx_port) + port_flow_map[tx_port_index].update([item['id']]) + + # Sort ports stats + ports_stats = sorted(ports_stats, key=lambda k: k['port_id']) + + # Get priority flows stats + prio_flows_stats = self.get_priority_flows_stats(flows_stats, duration) + samples['priority_stats'] = prio_flows_stats + + # this is not DPDK port num, but this is whatever number we gave + # when we selected ports and programmed the profile + for port_num in ports: + try: + # reverse lookup port name from port_num so the stats dict is descriptive + intf = resource_helper.vnfd_helper.find_interface_by_port(port_num) + port_name = intf['name'] + port_id = ports_stats[port_num]['port_id'] + port_subs_stats = \ + [port_data for port_data in pppoe_subs_per_port + if port_data.get('port_id') == port_id] + + avg_latency = \ + sum([float(self.get_flow_id_data( + flows_stats, flow, 'Store-Forward_Avg_latency_ns')) + for flow in port_flow_map[port_num]]) / len(port_flow_map[port_num]) + min_latency = \ + min([float(self.get_flow_id_data( + flows_stats, flow, 'Store-Forward_Min_latency_ns')) + for flow in port_flow_map[port_num]]) + max_latency = \ + max([float(self.get_flow_id_data( + flows_stats, flow, 'Store-Forward_Max_latency_ns')) + for flow in port_flow_map[port_num]]) + + samples[port_name] = { + 'RxThroughputBps': float(ports_stats[port_num]['Bytes_Rx']) / duration, + 'TxThroughputBps': float(ports_stats[port_num]['Bytes_Tx']) / duration, + 'InPackets': int(ports_stats[port_num]['Valid_Frames_Rx']), + 'OutPackets': int(ports_stats[port_num]['Frames_Tx']), + 'InBytes': int(ports_stats[port_num]['Bytes_Rx']), + 'OutBytes': int(ports_stats[port_num]['Bytes_Tx']), + 'RxThroughput': float(ports_stats[port_num]['Valid_Frames_Rx']) / duration, + 'TxThroughput': float(ports_stats[port_num]['Frames_Tx']) / duration, + 'LatencyAvg': utils.safe_cast(avg_latency, int, 0), + 'LatencyMin': utils.safe_cast(min_latency, int, 0), + 'LatencyMax': utils.safe_cast(max_latency, int, 0) + } + + if port_subs_stats: + samples[port_name].update( + {'SessionsUp': int(port_subs_stats[0]['Sessions_Up']), + 'SessionsDown': int(port_subs_stats[0]['Sessions_Down']), + 'SessionsNotStarted': int(port_subs_stats[0]['Sessions_Not_Started']), + 'SessionsTotal': int(port_subs_stats[0]['Sessions_Total'])} + ) + + except IndexError: + pass + + return samples + class IxiaRfc2544Helper(Rfc2544ResourceHelper): @@ -468,9 +667,6 @@ class IxiaResourceHelper(ClientResourceHelper): def _connect(self, client=None): self.client.connect(self.vnfd_helper) - def get_stats(self, *args, **kwargs): - return self.client.get_statistics() - def setup(self): super(IxiaResourceHelper, self).setup() self._init_ix_scenario() @@ -480,36 +676,7 @@ class IxiaResourceHelper(ClientResourceHelper): self._terminated.value = 1 def generate_samples(self, ports, duration): - stats = self.get_stats() - - samples = {} - # this is not DPDK port num, but this is whatever number we gave - # when we selected ports and programmed the profile - for port_num in ports: - try: - # reverse lookup port name from port_num so the stats dict is descriptive - intf = self.vnfd_helper.find_interface_by_port(port_num) - port_name = intf['name'] - avg_latency = stats['Store-Forward_Avg_latency_ns'][port_num] - min_latency = stats['Store-Forward_Min_latency_ns'][port_num] - max_latency = stats['Store-Forward_Max_latency_ns'][port_num] - samples[port_name] = { - 'rx_throughput_kps': float(stats['Rx_Rate_Kbps'][port_num]), - 'tx_throughput_kps': float(stats['Tx_Rate_Kbps'][port_num]), - 'rx_throughput_mbps': float(stats['Rx_Rate_Mbps'][port_num]), - 'tx_throughput_mbps': float(stats['Tx_Rate_Mbps'][port_num]), - 'in_packets': int(stats['Valid_Frames_Rx'][port_num]), - 'out_packets': int(stats['Frames_Tx'][port_num]), - 'RxThroughput': float(stats['Valid_Frames_Rx'][port_num]) / duration, - 'TxThroughput': float(stats['Frames_Tx'][port_num]) / duration, - 'Store-Forward_Avg_latency_ns': utils.safe_cast(avg_latency, int, 0), - 'Store-Forward_Min_latency_ns': utils.safe_cast(min_latency, int, 0), - 'Store-Forward_Max_latency_ns': utils.safe_cast(max_latency, int, 0) - } - except IndexError: - pass - - return samples + return self._ix_scenario.generate_samples(self, ports, duration) def _init_ix_scenario(self): ixia_config = self.scenario_helper.scenario_cfg.get('ixia_config', 'IxiaBasic') @@ -530,13 +697,17 @@ class IxiaResourceHelper(ClientResourceHelper): self._ix_scenario.apply_config() self._ix_scenario.create_traffic_model(traffic_profile) - def run_traffic(self, traffic_profile, *args): + def update_tracking_options(self): + self._ix_scenario.update_tracking_options() + + def run_traffic(self, traffic_profile): if self._terminated.value: return min_tol = self.rfc_helper.tolerance_low max_tol = self.rfc_helper.tolerance_high precision = self.rfc_helper.tolerance_precision + resolution = self.rfc_helper.resolution default = "00:00:00:00:00:00" self._build_ports() @@ -557,17 +728,19 @@ class IxiaResourceHelper(ClientResourceHelper): try: while not self._terminated.value: - first_run = traffic_profile.execute_traffic( - self, self.client, mac) + first_run = traffic_profile.execute_traffic(self, self.client, + mac) self.client_started.value = 1 # pylint: disable=unnecessary-lambda utils.wait_until_true(lambda: self.client.is_traffic_stopped(), timeout=traffic_profile.config.duration * 2) + rfc2544_opts = self._ix_scenario.get_tc_rfc2544_options() samples = self.generate_samples(traffic_profile.ports, traffic_profile.config.duration) completed, samples = traffic_profile.get_drop_percentage( - samples, min_tol, max_tol, precision, first_run=first_run) + samples, min_tol, max_tol, precision, resolution, + first_run=first_run, tc_rfc2544_opts=rfc2544_opts) self._queue.put(samples) if completed: @@ -577,23 +750,92 @@ class IxiaResourceHelper(ClientResourceHelper): LOG.exception('Run Traffic terminated') self._ix_scenario.stop_protocols() + self.client_started.value = 0 self._terminated.value = 1 - def collect_kpi(self): - self.rfc_helper.iteration.value += 1 - return super(IxiaResourceHelper, self).collect_kpi() + def run_test(self, traffic_profile, tasks_queue, results_queue, *args): # pragma: no cover + LOG.info("Ixia resource_helper run_test") + if self._terminated.value: + return + + min_tol = self.rfc_helper.tolerance_low + max_tol = self.rfc_helper.tolerance_high + precision = self.rfc_helper.tolerance_precision + resolution = self.rfc_helper.resolution + default = "00:00:00:00:00:00" + + self._build_ports() + traffic_profile.update_traffic_profile(self) + self._initialize_client(traffic_profile) + + mac = {} + for port_name in self.vnfd_helper.port_pairs.all_ports: + intf = self.vnfd_helper.find_interface(name=port_name) + virt_intf = intf["virtual-interface"] + # we only know static traffic id by reading the json + # this is used by _get_ixia_trafficrofile + port_num = self.vnfd_helper.port_num(intf) + mac["src_mac_{}".format(port_num)] = virt_intf.get("local_mac", default) + mac["dst_mac_{}".format(port_num)] = virt_intf.get("dst_mac", default) + + self._ix_scenario.run_protocols() + + try: + completed = False + self.rfc_helper.iteration.value = 0 + self.client_started.value = 1 + while completed is False and not self._terminated.value: + LOG.info("Wait for task ...") + + try: + task = tasks_queue.get(True, 5) + except moves.queue.Empty: + continue + else: + if task != 'RUN_TRAFFIC': + continue + + self.rfc_helper.iteration.value += 1 + LOG.info("Got %s task, start iteration %d", task, + self.rfc_helper.iteration.value) + first_run = traffic_profile.execute_traffic(self, self.client, + mac) + # pylint: disable=unnecessary-lambda + utils.wait_until_true(lambda: self.client.is_traffic_stopped(), + timeout=traffic_profile.config.duration * 2) + samples = self.generate_samples(traffic_profile.ports, + traffic_profile.config.duration) + + completed, samples = traffic_profile.get_drop_percentage( + samples, min_tol, max_tol, precision, resolution, + first_run=first_run) + self._queue.put(samples) + + if completed: + LOG.debug("IxiaResourceHelper::run_test - test completed") + results_queue.put('COMPLETE') + else: + results_queue.put('CONTINUE') + tasks_queue.task_done() + + except Exception: # pylint: disable=broad-except + LOG.exception('Run Traffic terminated') + + self._ix_scenario.stop_protocols() + self.client_started.value = 0 + LOG.debug("IxiaResourceHelper::run_test done") class IxiaTrafficGen(SampleVNFTrafficGen): APP_NAME = 'Ixia' - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if resource_helper_type is None: resource_helper_type = IxiaResourceHelper - super(IxiaTrafficGen, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(IxiaTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) self._ixia_traffic_gen = None self.ixia_file_name = '' self.vnf_port_pairs = [] diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py index 7ecb12478..a9c0222ac 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,12 +15,14 @@ import logging import time +from six import moves from yardstick.common import utils from yardstick.network_services.vnf_generic.vnf import sample_vnf from yardstick.network_services.vnf_generic.vnf import tg_trex +from trex_stl_lib.trex_stl_exceptions import STLError -LOGGING = logging.getLogger(__name__) +LOG = logging.getLogger(__name__) class TrexRfcResourceHelper(tg_trex.TrexResourceHelper): @@ -48,7 +50,8 @@ class TrexRfcResourceHelper(tg_trex.TrexResourceHelper): completed, output = traffic_profile.get_drop_percentage( samples, self.rfc2544_helper.tolerance_low, self.rfc2544_helper.tolerance_high, - self.rfc2544_helper.correlated_traffic) + self.rfc2544_helper.correlated_traffic, + self.rfc2544_helper.resolution) self._queue.put(output) return completed @@ -58,6 +61,56 @@ class TrexRfcResourceHelper(tg_trex.TrexResourceHelper): def clear_client_stats(self, ports): self.client.clear_stats(ports=ports) + def run_test(self, traffic_profile, tasks_queue, results_queue, *args): # pragma: no cover + LOG.debug("Trex resource_helper run_test") + if self._terminated.value: + return + # if we don't do this we can hang waiting for the queue to drain + # have to do this in the subprocess + self._queue.cancel_join_thread() + try: + self._build_ports() + self.client = self._connect() + self.client.reset(ports=self.all_ports) + self.client.remove_all_streams(self.all_ports) # remove all streams + traffic_profile.register_generator(self) + + completed = False + self.rfc2544_helper.iteration.value = 0 + self.client_started.value = 1 + while completed is False and not self._terminated.value: + LOG.debug("Wait for task ...") + try: + task = tasks_queue.get(True, 5) + except moves.queue.Empty: + LOG.debug("Wait for task timeout, continue waiting...") + continue + else: + if task != 'RUN_TRAFFIC': + continue + self.rfc2544_helper.iteration.value += 1 + LOG.info("Got %s task, start iteration %d", task, + self.rfc2544_helper.iteration.value) + completed = self._run_traffic_once(traffic_profile) + if completed: + LOG.debug("%s::run_test - test completed", + self.__class__.__name__) + results_queue.put('COMPLETE') + else: + results_queue.put('CONTINUE') + tasks_queue.task_done() + + self.client.stop(self.all_ports) + self.client.disconnect() + self._terminated.value = 0 + except STLError: + if self._terminated.value: + LOG.debug("traffic generator is stopped") + return # return if trex/tg server is stopped. + raise + + self.client_started.value = 0 + LOG.debug("%s::run_test done", self.__class__.__name__) class TrexTrafficGenRFC(tg_trex.TrexTrafficGen): """ @@ -65,9 +118,9 @@ class TrexTrafficGenRFC(tg_trex.TrexTrafficGen): traffic for rfc2544 testcase. """ - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if resource_helper_type is None: resource_helper_type = TrexRfcResourceHelper - super(TrexTrafficGenRFC, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(TrexTrafficGenRFC, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) diff --git a/yardstick/network_services/vnf_generic/vnf/tg_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_trex.py index 4296da84c..0cb66a714 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_trex.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_trex.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -179,6 +179,8 @@ class TrexResourceHelper(ClientResourceHelper): 'tx_throughput_bps': float(port_stats.get('tx_bps', 0.0)), 'in_packets': int(port_stats.get('ipackets', 0)), 'out_packets': int(port_stats.get('opackets', 0)), + 'in_bytes': int(port_stats.get('ibytes', 0)), + 'out_bytes': int(port_stats.get('obytes', 0)), 'timestamp': timestamp } @@ -200,14 +202,15 @@ class TrexTrafficGen(SampleVNFTrafficGen): APP_NAME = 'TRex' - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if resource_helper_type is None: resource_helper_type = TrexResourceHelper + if setup_env_helper_type is None: setup_env_helper_type = TrexDpdkVnfSetupEnvHelper - super(TrexTrafficGen, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(TrexTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) def _check_status(self): return self.resource_helper.check_status() diff --git a/yardstick/network_services/vnf_generic/vnf/tg_trex_vpp.py b/yardstick/network_services/vnf_generic/vnf/tg_trex_vpp.py new file mode 100644 index 000000000..846304880 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/tg_trex_vpp.py @@ -0,0 +1,178 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from trex_stl_lib.trex_stl_exceptions import STLError + +from yardstick.common.utils import safe_cast +from yardstick.network_services.vnf_generic.vnf.sample_vnf import \ + Rfc2544ResourceHelper +from yardstick.network_services.vnf_generic.vnf.sample_vnf import \ + SampleVNFTrafficGen +from yardstick.network_services.vnf_generic.vnf.tg_trex import \ + TrexDpdkVnfSetupEnvHelper +from yardstick.network_services.vnf_generic.vnf.tg_trex import \ + TrexResourceHelper + +LOGGING = logging.getLogger(__name__) + + +class TrexVppResourceHelper(TrexResourceHelper): + + def __init__(self, setup_helper, rfc_helper_type=None): + super(TrexVppResourceHelper, self).__init__(setup_helper) + + if rfc_helper_type is None: + rfc_helper_type = Rfc2544ResourceHelper + + self.rfc2544_helper = rfc_helper_type(self.scenario_helper) + + self.loss = None + self.sent = None + self.latency = None + + def generate_samples(self, stats=None, ports=None, port_pg_id=None, + latency=False): + samples = {} + if stats is None: + stats = self.get_stats(ports) + for pname in (intf['name'] for intf in self.vnfd_helper.interfaces): + port_num = self.vnfd_helper.port_num(pname) + port_stats = stats.get(port_num, {}) + samples[pname] = { + 'rx_throughput_fps': float(port_stats.get('rx_pps', 0.0)), + 'tx_throughput_fps': float(port_stats.get('tx_pps', 0.0)), + 'rx_throughput_bps': float(port_stats.get('rx_bps', 0.0)), + 'tx_throughput_bps': float(port_stats.get('tx_bps', 0.0)), + 'in_packets': int(port_stats.get('ipackets', 0)), + 'out_packets': int(port_stats.get('opackets', 0)), + } + + if latency: + pg_id_list = port_pg_id.get_pg_ids(port_num) + samples[pname]['latency'] = {} + for pg_id in pg_id_list: + latency_global = stats.get('latency', {}) + pg_latency = latency_global.get(pg_id, {}).get('latency') + + t_min = safe_cast(pg_latency.get("total_min", 0.0), float, + -1.0) + t_avg = safe_cast(pg_latency.get("average", 0.0), float, + -1.0) + t_max = safe_cast(pg_latency.get("total_max", 0.0), float, + -1.0) + + latency = { + "min_latency": t_min, + "max_latency": t_max, + "avg_latency": t_avg, + } + samples[pname]['latency'][pg_id] = latency + + return samples + + def _run_traffic_once(self, traffic_profile): + self.client_started.value = 1 + traffic_profile.execute_traffic(self) + return True + + def run_traffic(self, traffic_profile): + self._queue.cancel_join_thread() + traffic_profile.init_queue(self._queue) + super(TrexVppResourceHelper, self).run_traffic(traffic_profile) + + @staticmethod + def fmt_latency(lat_min, lat_avg, lat_max): + t_min = int(round(safe_cast(lat_min, float, -1.0))) + t_avg = int(round(safe_cast(lat_avg, float, -1.0))) + t_max = int(round(safe_cast(lat_max, float, -1.0))) + + return "/".join(str(tmp) for tmp in (t_min, t_avg, t_max)) + + def send_traffic_on_tg(self, ports, port_pg_id, duration, rate, + latency=False): + try: + # Choose rate and start traffic: + self.client.start(ports=ports, mult=rate, duration=duration) + # Block until done: + try: + self.client.wait_on_traffic(ports=ports, timeout=duration + 20) + except STLError as err: + self.client.stop(ports) + LOGGING.error("TRex stateless timeout error: %s", err) + + if self.client.get_warnings(): + for warning in self.client.get_warnings(): + LOGGING.warning(warning) + + # Read the stats after the test + stats = self.client.get_stats() + + packets_in = [] + packets_out = [] + for port in ports: + packets_in.append(stats[port]["ipackets"]) + packets_out.append(stats[port]["opackets"]) + + if latency: + self.latency = [] + pg_id_list = port_pg_id.get_pg_ids(port) + for pg_id in pg_id_list: + latency_global = stats.get('latency', {}) + pg_latency = latency_global.get(pg_id, {}).get( + 'latency') + lat = self.fmt_latency( + str(pg_latency.get("total_min")), + str(pg_latency.get("average")), + str(pg_latency.get("total_max"))) + LOGGING.info( + "latencyStream%s(usec)=%s", pg_id, lat) + self.latency.append(lat) + + self.sent = sum(packets_out) + total_rcvd = sum(packets_in) + self.loss = self.sent - total_rcvd + LOGGING.info("rate=%s, totalReceived=%s, totalSent=%s," + " frameLoss=%s", rate, total_rcvd, self.sent, + self.loss) + return stats + except STLError as err: + LOGGING.error("TRex stateless runtime error: %s", err) + raise RuntimeError('TRex stateless runtime error') + + +class TrexTrafficGenVpp(SampleVNFTrafficGen): + APP_NAME = 'TRex' + WAIT_TIME = 20 + + def __init__(self, name, vnfd, setup_env_helper_type=None, + resource_helper_type=None): + if setup_env_helper_type is None: + setup_env_helper_type = TrexDpdkVnfSetupEnvHelper + if resource_helper_type is None: + resource_helper_type = TrexVppResourceHelper + + super(TrexTrafficGenVpp, self).__init__( + name, vnfd, setup_env_helper_type, resource_helper_type) + + def _check_status(self): + return self.resource_helper.check_status() + + def _start_server(self): + super(TrexTrafficGenVpp, self)._start_server() + self.resource_helper.start() + + def wait_for_instantiate(self): + return self._wait_for_process() diff --git a/yardstick/network_services/vnf_generic/vnf/tg_vcmts_pktgen.py b/yardstick/network_services/vnf_generic/vnf/tg_vcmts_pktgen.py new file mode 100755 index 000000000..c6df9d04c --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/tg_vcmts_pktgen.py @@ -0,0 +1,215 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +import socket +import yaml +import os + +from yardstick.network_services.vnf_generic.vnf import sample_vnf +from yardstick.common import exceptions + + +LOG = logging.getLogger(__name__) + + +class PktgenHelper(object): + + RETRY_SECONDS = 0.5 + RETRY_COUNT = 20 + CONNECT_TIMEOUT = 5 + + def __init__(self, host, port=23000): + self.host = host + self.port = port + self.connected = False + + def _connect(self): + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + ret = True + try: + self._sock.settimeout(self.CONNECT_TIMEOUT) + self._sock.connect((self.host, self.port)) + except (socket.gaierror, socket.error, socket.timeout): + self._sock.close() + ret = False + + return ret + + def connect(self): + if self.connected: + return True + LOG.info("Connecting to pktgen instance at %s...", self.host) + for idx in range(self.RETRY_COUNT): + self.connected = self._connect() + if self.connected: + return True + LOG.debug("Connection attempt %d: Unable to connect to %s, " \ + "retrying in %d seconds", + idx, self.host, self.RETRY_SECONDS) + time.sleep(self.RETRY_SECONDS) + + LOG.error("Unable to connect to pktgen instance on %s !", + self.host) + return False + + + def send_command(self, command): + if not self.connected: + LOG.error("Pktgen socket is not connected") + return False + + try: + self._sock.sendall((command + "\n").encode()) + time.sleep(1) + except (socket.timeout, socket.error): + LOG.error("Error sending command '%s'", command) + return False + + return True + + +class VcmtsPktgenSetupEnvHelper(sample_vnf.SetupEnvHelper): + + BASE_PARAMETERS = "export LUA_PATH=/vcmts/Pktgen.lua;"\ + + "export CMK_PROC_FS=/host/proc;" + + PORTS_COUNT = 8 + + def generate_pcap_filename(self, port_cfg): + return port_cfg['traffic_type'] + "_" + port_cfg['num_subs'] \ + + "cms_" + port_cfg['num_ofdm'] + "ofdm.pcap" + + def find_port_cfg(self, ports_cfg, port_name): + for port_cfg in ports_cfg: + if port_name in port_cfg: + return port_cfg + return None + + def build_pktgen_parameters(self, pod_cfg): + ports_cfg = pod_cfg['ports'] + port_cfg = list() + + for i in range(self.PORTS_COUNT): + port_cfg.append(self.find_port_cfg(ports_cfg, 'port_' + str(i))) + + pktgen_parameters = self.BASE_PARAMETERS + " " \ + + " /pktgen-config/setup.sh " + pod_cfg['pktgen_id'] \ + + " " + pod_cfg['num_ports'] + + for i in range(self.PORTS_COUNT): + pktgen_parameters += " " + port_cfg[i]['net_pktgen'] + + for i in range(self.PORTS_COUNT): + pktgen_parameters += " " + self.generate_pcap_filename(port_cfg[i]) + + return pktgen_parameters + + def start_pktgen(self, pod_cfg): + self.ssh_helper.drop_connection() + cmd = self.build_pktgen_parameters(pod_cfg) + LOG.debug("Executing: '%s'", cmd) + self.ssh_helper.send_command(cmd) + LOG.info("Pktgen executed") + + def setup_vnf_environment(self): + pass + + +class VcmtsPktgen(sample_vnf.SampleVNFTrafficGen): + + TG_NAME = 'VcmtsPktgen' + APP_NAME = 'VcmtsPktgen' + RUN_WAIT = 4 + DEFAULT_RATE = 8.0 + + PKTGEN_BASE_PORT = 23000 + + def __init__(self, name, vnfd, setup_env_helper_type=None, + resource_helper_type=None): + if setup_env_helper_type is None: + setup_env_helper_type = VcmtsPktgenSetupEnvHelper + super(VcmtsPktgen, self).__init__( + name, vnfd, setup_env_helper_type, resource_helper_type) + + self.pktgen_address = vnfd['mgmt-interface']['ip'] + LOG.info("Pktgen container '%s', IP: %s", name, self.pktgen_address) + + def extract_pod_cfg(self, pktgen_pods_cfg, pktgen_id): + for pod_cfg in pktgen_pods_cfg: + if pod_cfg['pktgen_id'] == pktgen_id: + return pod_cfg + return None + + def instantiate(self, scenario_cfg, context_cfg): + super(VcmtsPktgen, self).instantiate(scenario_cfg, context_cfg) + self._start_server() + options = scenario_cfg.get('options', {}) + self.pktgen_rate = options.get('pktgen_rate', self.DEFAULT_RATE) + + try: + pktgen_values_filepath = options['pktgen_values'] + except KeyError: + raise KeyError("Missing pktgen_values key in scenario options" \ + "section of the task definition file") + + if not os.path.isfile(pktgen_values_filepath): + raise RuntimeError("The pktgen_values file path provided " \ + "does not exists") + + # The yaml_loader.py (SafeLoader) underlying regex has an issue + # with reading PCI addresses (processed as double). so the + # BaseLoader is used here. + with open(pktgen_values_filepath) as stream: + pktgen_values = yaml.load(stream, Loader=yaml.BaseLoader) + + if pktgen_values == None: + raise RuntimeError("Error reading pktgen_values file provided (" + + pktgen_values_filepath + ")") + + self.pktgen_id = int(options[self.name]['pktgen_id']) + self.resource_helper.pktgen_id = self.pktgen_id + + self.pktgen_helper = PktgenHelper(self.pktgen_address, + self.PKTGEN_BASE_PORT + self.pktgen_id) + + pktgen_pods_cfg = pktgen_values['topology']['pktgen_pods'] + + self.pod_cfg = self.extract_pod_cfg(pktgen_pods_cfg, + str(self.pktgen_id)) + + if self.pod_cfg == None: + raise KeyError("Pktgen with id " + str(self.pktgen_id) + \ + " was not found") + + self.setup_helper.start_pktgen(self.pod_cfg) + + def run_traffic(self, traffic_profile): + if not self.pktgen_helper.connect(): + raise exceptions.PktgenActionError(command="connect") + LOG.info("Connected to pktgen instance at %s", self.pktgen_address) + + commands = [] + for i in range(self.setup_helper.PORTS_COUNT): + commands.append('pktgen.set("' + str(i) + '", "rate", ' + + "%0.1f" % self.pktgen_rate + ');') + + commands.append('pktgen.start("all");') + + for command in commands: + if self.pktgen_helper.send_command(command): + LOG.debug("Command '%s' sent to pktgen", command) + LOG.info("Traffic started on %s...", self.name) + return True diff --git a/yardstick/network_services/vnf_generic/vnf/udp_replay.py b/yardstick/network_services/vnf_generic/vnf/udp_replay.py index e3fde1a79..a3b0b9fd9 100644 --- a/yardstick/network_services/vnf_generic/vnf/udp_replay.py +++ b/yardstick/network_services/vnf_generic/vnf/udp_replay.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -60,14 +60,15 @@ class UdpReplayApproxVnf(SampleVNF): PIPELINE_COMMAND = REPLAY_PIPELINE_COMMAND - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if resource_helper_type is None: resource_helper_type = UdpReplayResourceHelper + if setup_env_helper_type is None: setup_env_helper_type = UdpReplaySetupEnvHelper - super(UdpReplayApproxVnf, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(UdpReplayApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) def _build_pipeline_kwargs(self): ports = self.vnfd_helper.port_pairs.all_ports @@ -108,7 +109,7 @@ class UdpReplayApproxVnf(SampleVNF): def collect_kpi(self): def get_sum(offset): - return sum(int(i) for i in split_stats[offset::5]) + return sum(int(i) for i in split_stats[offset::6]) # we can't get KPIs if the VNF is down check_if_process_failed(self._vnf_process) @@ -116,7 +117,7 @@ class UdpReplayApproxVnf(SampleVNF): stats = self.get_stats() stats_words = stats.split() - split_stats = stats_words[stats_words.index('0'):][:number_of_ports * 5] + split_stats = stats_words[stats_words.index('arp_pkts') + 1:][:number_of_ports * 6] physical_node = ctx_base.Context.get_physical_node_from_server( self.scenario_helper.nodes[self.name]) diff --git a/yardstick/network_services/vnf_generic/vnf/vcmts_vnf.py b/yardstick/network_services/vnf_generic/vnf/vcmts_vnf.py new file mode 100755 index 000000000..0b48ef4e9 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/vcmts_vnf.py @@ -0,0 +1,273 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import yaml + +from influxdb import InfluxDBClient + +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SetupEnvHelper +from yardstick.common import constants +from yardstick.common import exceptions +from yardstick.network_services.vnf_generic.vnf.base import GenericVNF +from yardstick.network_services.vnf_generic.vnf.sample_vnf import ScenarioHelper +from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper +from yardstick.network_services.utils import get_nsb_option + + +LOG = logging.getLogger(__name__) + + +class InfluxDBHelper(object): + + INITIAL_VALUE = 'now() - 1m' + + def __init__(self, vcmts_influxdb_ip, vcmts_influxdb_port): + self._vcmts_influxdb_ip = vcmts_influxdb_ip + self._vcmts_influxdb_port = vcmts_influxdb_port + self._last_upstream_rx = self.INITIAL_VALUE + self._last_values_time = dict() + + def start(self): + self._read_client = InfluxDBClient(host=self._vcmts_influxdb_ip, + port=self._vcmts_influxdb_port, + database='collectd') + self._write_client = InfluxDBClient(host=constants.INFLUXDB_IP, + port=constants.INFLUXDB_PORT, + database='collectd') + + def _get_last_value_time(self, measurement): + if measurement in self._last_values_time: + return self._last_values_time[measurement] + return self.INITIAL_VALUE + + def _set_last_value_time(self, measurement, time): + self._last_values_time[measurement] = "'" + time + "'" + + def _query_measurement(self, measurement): + # There is a delay before influxdb flushes the data + query = "SELECT * FROM " + measurement + " WHERE time > " \ + + self._get_last_value_time(measurement) \ + + " ORDER BY time ASC;" + query_result = self._read_client.query(query) + if len(query_result.keys()) == 0: + return None + return query_result.get_points(measurement) + + def _rw_measurment(self, measurement, columns): + query_result = self._query_measurement(measurement) + if query_result == None: + return + + points_to_write = list() + for entry in query_result: + point = { + "measurement": measurement, + "tags": { + "type": entry['type'], + "host": entry['host'] + }, + "time": entry['time'], + "fields": {} + } + + for column in columns: + if column == 'value': + point["fields"][column] = float(entry[column]) + else: + point["fields"][column] = entry[column] + + points_to_write.append(point) + self._set_last_value_time(measurement, entry['time']) + + # Write the points to yardstick database + if self._write_client.write_points(points_to_write): + LOG.debug("%d new points written to '%s' measurement", + len(points_to_write), measurement) + + def copy_kpi(self): + self._rw_measurment("cpu_value", ["instance", "type_instance", "value"]) + self._rw_measurment("cpufreq_value", ["type_instance", "value"]) + self._rw_measurment("downstream_rx", ["value"]) + self._rw_measurment("downstream_tx", ["value"]) + self._rw_measurment("downstream_value", ["value"]) + self._rw_measurment("ds_per_cm_value", ["instance", "value"]) + self._rw_measurment("intel_rdt_value", ["instance", "type_instance", "value"]) + self._rw_measurment("turbostat_value", ["instance", "type_instance", "value"]) + self._rw_measurment("upstream_rx", ["value"]) + self._rw_measurment("upstream_tx", ["value"]) + self._rw_measurment("upstream_value", ["value"]) + + +class VcmtsdSetupEnvHelper(SetupEnvHelper): + + BASE_PARAMETERS = "export LD_LIBRARY_PATH=/opt/collectd/lib:;"\ + + "export CMK_PROC_FS=/host/proc;" + + def build_us_parameters(self, pod_cfg): + return self.BASE_PARAMETERS + " " \ + + " /opt/bin/cmk isolate --conf-dir=/etc/cmk" \ + + " --socket-id=" + pod_cfg['cpu_socket_id'] \ + + " --pool=shared" \ + + " /vcmts-config/run_upstream.sh " + pod_cfg['sg_id'] \ + + " " + pod_cfg['ds_core_type'] \ + + " " + pod_cfg['num_ofdm'] + "ofdm" \ + + " " + pod_cfg['num_subs'] + "cm" \ + + " " + pod_cfg['cm_crypto'] \ + + " " + pod_cfg['qat'] \ + + " " + pod_cfg['net_us'] \ + + " " + pod_cfg['power_mgmt'] + + def build_ds_parameters(self, pod_cfg): + return self.BASE_PARAMETERS + " " \ + + " /opt/bin/cmk isolate --conf-dir=/etc/cmk" \ + + " --socket-id=" + pod_cfg['cpu_socket_id'] \ + + " --pool=" + pod_cfg['ds_core_type'] \ + + " /vcmts-config/run_downstream.sh " + pod_cfg['sg_id'] \ + + " " + pod_cfg['ds_core_type'] \ + + " " + pod_cfg['ds_core_pool_index'] \ + + " " + pod_cfg['num_ofdm'] + "ofdm" \ + + " " + pod_cfg['num_subs'] + "cm" \ + + " " + pod_cfg['cm_crypto'] \ + + " " + pod_cfg['qat'] \ + + " " + pod_cfg['net_ds'] \ + + " " + pod_cfg['power_mgmt'] + + def build_cmd(self, stream_dir, pod_cfg): + if stream_dir == 'ds': + return self.build_ds_parameters(pod_cfg) + else: + return self.build_us_parameters(pod_cfg) + + def run_vcmtsd(self, stream_dir, pod_cfg): + cmd = self.build_cmd(stream_dir, pod_cfg) + LOG.debug("Executing %s", cmd) + self.ssh_helper.send_command(cmd) + + def setup_vnf_environment(self): + pass + + +class VcmtsVNF(GenericVNF): + + RUN_WAIT = 4 + + def __init__(self, name, vnfd): + super(VcmtsVNF, self).__init__(name, vnfd) + self.name = name + self.bin_path = get_nsb_option('bin_path', '') + self.scenario_helper = ScenarioHelper(self.name) + self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path) + + self.setup_helper = VcmtsdSetupEnvHelper(self.vnfd_helper, + self.ssh_helper, + self.scenario_helper) + + def extract_pod_cfg(self, vcmts_pods_cfg, sg_id): + for pod_cfg in vcmts_pods_cfg: + if pod_cfg['sg_id'] == sg_id: + return pod_cfg + + def instantiate(self, scenario_cfg, context_cfg): + self._update_collectd_options(scenario_cfg, context_cfg) + self.scenario_helper.scenario_cfg = scenario_cfg + self.context_cfg = context_cfg + + options = scenario_cfg.get('options', {}) + + try: + self.vcmts_influxdb_ip = options['vcmts_influxdb_ip'] + self.vcmts_influxdb_port = options['vcmts_influxdb_port'] + except KeyError: + raise KeyError("Missing destination InfluxDB details in scenario" \ + " section of the task definition file") + + try: + vcmtsd_values_filepath = options['vcmtsd_values'] + except KeyError: + raise KeyError("Missing vcmtsd_values key in scenario options" \ + "section of the task definition file") + + if not os.path.isfile(vcmtsd_values_filepath): + raise RuntimeError("The vcmtsd_values file path provided " \ + "does not exists") + + # The yaml_loader.py (SafeLoader) underlying regex has an issue + # with reading PCI addresses (processed as double). so the + # BaseLoader is used here. + with open(vcmtsd_values_filepath) as stream: + vcmtsd_values = yaml.load(stream, Loader=yaml.BaseLoader) + + if vcmtsd_values == None: + raise RuntimeError("Error reading vcmtsd_values file provided (" + + vcmtsd_values_filepath + ")") + + vnf_options = options.get(self.name, {}) + sg_id = str(vnf_options['sg_id']) + stream_dir = vnf_options['stream_dir'] + + try: + vcmts_pods_cfg = vcmtsd_values['topology']['vcmts_pods'] + except KeyError: + raise KeyError("Missing vcmts_pods key in the " \ + "vcmtsd_values file provided") + + pod_cfg = self.extract_pod_cfg(vcmts_pods_cfg, sg_id) + if pod_cfg == None: + raise exceptions.IncorrectConfig(error_msg="Service group " + sg_id + " not found") + + self.setup_helper.run_vcmtsd(stream_dir, pod_cfg) + + def _update_collectd_options(self, scenario_cfg, context_cfg): + scenario_options = scenario_cfg.get('options', {}) + generic_options = scenario_options.get('collectd', {}) + scenario_node_options = scenario_options.get(self.name, {})\ + .get('collectd', {}) + context_node_options = context_cfg.get('nodes', {})\ + .get(self.name, {}).get('collectd', {}) + + options = generic_options + self._update_options(options, scenario_node_options) + self._update_options(options, context_node_options) + + self.setup_helper.collectd_options = options + + def _update_options(self, options, additional_options): + for k, v in additional_options.items(): + if isinstance(v, dict) and k in options: + options[k].update(v) + else: + options[k] = v + + def wait_for_instantiate(self): + pass + + def terminate(self): + pass + + def scale(self, flavor=""): + pass + + def collect_kpi(self): + self.influxdb_helper.copy_kpi() + return {"n/a": "n/a"} + + def start_collect(self): + self.influxdb_helper = InfluxDBHelper(self.vcmts_influxdb_ip, + self.vcmts_influxdb_port) + self.influxdb_helper.start() + + def stop_collect(self): + pass diff --git a/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py b/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py index a1523dee3..743f2d4bb 100644 --- a/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -52,9 +52,12 @@ class FWApproxVnf(SampleVNF): 'packets_dropped': 3, } - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = FWApproxSetupEnvHelper - super(FWApproxVnf, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(FWApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) + + def wait_for_instantiate(self): + """Wait for VNF to initialize""" + self.wait_for_initialize() diff --git a/yardstick/network_services/vnf_generic/vnf/vims_vnf.py b/yardstick/network_services/vnf_generic/vnf/vims_vnf.py new file mode 100644 index 000000000..0e339b171 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/vims_vnf.py @@ -0,0 +1,105 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +from yardstick.network_services.vnf_generic.vnf import sample_vnf + +LOG = logging.getLogger(__name__) + + +class VimsSetupEnvHelper(sample_vnf.SetupEnvHelper): + + def setup_vnf_environment(self): + LOG.debug('VimsSetupEnvHelper:\n') + + +class VimsResourceHelper(sample_vnf.ClientResourceHelper): + pass + + +class VimsPcscfVnf(sample_vnf.SampleVNF): + + APP_NAME = "VimsPcscf" + APP_WORD = "VimsPcscf" + + def __init__(self, name, vnfd, setup_env_helper_type=None, + resource_helper_type=None): + if resource_helper_type is None: + resource_helper_type = VimsResourceHelper + if setup_env_helper_type is None: + setup_env_helper_type = VimsSetupEnvHelper + super(VimsPcscfVnf, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) + + def wait_for_instantiate(self): + pass + + def _run(self): + pass + + def start_collect(self): + # TODO + pass + + def collect_kpi(self): + # TODO + pass + + +class VimsHssVnf(sample_vnf.SampleVNF): + + APP_NAME = "VimsHss" + APP_WORD = "VimsHss" + CMD = "sudo /media/generate_user.sh {} {} >> /dev/null 2>&1" + + def __init__(self, name, vnfd, setup_env_helper_type=None, + resource_helper_type=None): + if resource_helper_type is None: + resource_helper_type = VimsResourceHelper + if setup_env_helper_type is None: + setup_env_helper_type = VimsSetupEnvHelper + super(VimsHssVnf, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) + self.start_user = 1 + self.end_user = 10000 + self.WAIT_TIME = 600 + + def instantiate(self, scenario_cfg, context_cfg): + LOG.debug("scenario_cfg=%s\n", scenario_cfg) + self.start_user = scenario_cfg.get("options", {}).get("start_user", self.start_user) + self.end_user = scenario_cfg.get("options", {}).get("end_user", self.end_user) + # TODO + # Need to check HSS services are ready before generating user accounts + # Now, adding time sleep that manually configured by user + # to wait for HSS services. + # Note: for heat, waiting time is too long (~ 600s) + self.WAIT_TIME = scenario_cfg.get("options", {}).get("wait_time", self.WAIT_TIME) + time.sleep(self.WAIT_TIME) + LOG.debug("Generate user accounts from %d to %d\n", + self.start_user, self.end_user) + cmd = self.CMD.format(self.start_user, self.end_user) + self.ssh_helper.execute(cmd, None, 3600, False) + + def wait_for_instantiate(self): + pass + + def start_collect(self): + # TODO + pass + + def collect_kpi(self): + # TODO + pass diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py index dd3221386..322ecd016 100644 --- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -158,12 +158,11 @@ class VpeApproxVnf(SampleVNF): COLLECT_KPI = VPE_COLLECT_KPI WAIT_TIME = 20 - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = VpeApproxSetupEnvHelper - super(VpeApproxVnf, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(VpeApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) def get_stats(self, *args, **kwargs): raise NotImplementedError diff --git a/yardstick/network_services/vnf_generic/vnf/vpp_helpers.py b/yardstick/network_services/vnf_generic/vnf/vpp_helpers.py new file mode 100644 index 000000000..fe8e7b2ba --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/vpp_helpers.py @@ -0,0 +1,751 @@ +# Copyright (c) 2019 Viosoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import binascii +import ipaddress +import json +import logging +import os +import re +import tempfile +import time +from collections import OrderedDict + +from yardstick.common import constants +from yardstick.common import exceptions +from yardstick.network_services.helpers.cpu import CpuSysCores +from yardstick.network_services.vnf_generic.vnf.sample_vnf import \ + DpdkVnfSetupEnvHelper + +LOG = logging.getLogger(__name__) + + +class VppConfigGenerator(object): + VPP_LOG_FILE = '/tmp/vpe.log' + + def __init__(self): + self._nodeconfig = {} + self._vpp_config = '' + + def add_config_item(self, config, value, path): + if len(path) == 1: + config[path[0]] = value + return + if path[0] not in config: + config[path[0]] = {} + elif isinstance(config[path[0]], str): + config[path[0]] = {} if config[path[0]] == '' \ + else {config[path[0]]: ''} + self.add_config_item(config[path[0]], value, path[1:]) + + def add_unix_log(self, value=None): + path = ['unix', 'log'] + if value is None: + value = self.VPP_LOG_FILE + self.add_config_item(self._nodeconfig, value, path) + + def add_unix_cli_listen(self, value='/run/vpp/cli.sock'): + path = ['unix', 'cli-listen'] + self.add_config_item(self._nodeconfig, value, path) + + def add_unix_nodaemon(self): + path = ['unix', 'nodaemon'] + self.add_config_item(self._nodeconfig, '', path) + + def add_unix_coredump(self): + path = ['unix', 'full-coredump'] + self.add_config_item(self._nodeconfig, '', path) + + def add_dpdk_dev(self, *devices): + for device in devices: + if VppConfigGenerator.pci_dev_check(device): + path = ['dpdk', 'dev {0}'.format(device)] + self.add_config_item(self._nodeconfig, '', path) + + def add_dpdk_cryptodev(self, count, cryptodev): + for i in range(count): + cryptodev_config = 'dev {0}'.format( + re.sub(r'\d.\d$', '1.' + str(i), cryptodev)) + path = ['dpdk', cryptodev_config] + self.add_config_item(self._nodeconfig, '', path) + self.add_dpdk_uio_driver('igb_uio') + + def add_dpdk_sw_cryptodev(self, sw_pmd_type, socket_id, count): + for _ in range(count): + cryptodev_config = 'vdev cryptodev_{0}_pmd,socket_id={1}'. \ + format(sw_pmd_type, str(socket_id)) + path = ['dpdk', cryptodev_config] + self.add_config_item(self._nodeconfig, '', path) + + def add_dpdk_dev_default_rxq(self, value): + path = ['dpdk', 'dev default', 'num-rx-queues'] + self.add_config_item(self._nodeconfig, value, path) + + def add_dpdk_dev_default_rxd(self, value): + path = ['dpdk', 'dev default', 'num-rx-desc'] + self.add_config_item(self._nodeconfig, value, path) + + def add_dpdk_dev_default_txd(self, value): + path = ['dpdk', 'dev default', 'num-tx-desc'] + self.add_config_item(self._nodeconfig, value, path) + + def add_dpdk_log_level(self, value): + path = ['dpdk', 'log-level'] + self.add_config_item(self._nodeconfig, value, path) + + def add_dpdk_socketmem(self, value): + path = ['dpdk', 'socket-mem'] + self.add_config_item(self._nodeconfig, value, path) + + def add_dpdk_num_mbufs(self, value): + path = ['dpdk', 'num-mbufs'] + self.add_config_item(self._nodeconfig, value, path) + + def add_dpdk_uio_driver(self, value=None): + path = ['dpdk', 'uio-driver'] + self.add_config_item(self._nodeconfig, value, path) + + def add_cpu_main_core(self, value): + path = ['cpu', 'main-core'] + self.add_config_item(self._nodeconfig, value, path) + + def add_cpu_corelist_workers(self, value): + path = ['cpu', 'corelist-workers'] + self.add_config_item(self._nodeconfig, value, path) + + def add_heapsize(self, value): + path = ['heapsize'] + self.add_config_item(self._nodeconfig, value, path) + + def add_ip6_hash_buckets(self, value): + path = ['ip6', 'hash-buckets'] + self.add_config_item(self._nodeconfig, value, path) + + def add_ip6_heap_size(self, value): + path = ['ip6', 'heap-size'] + self.add_config_item(self._nodeconfig, value, path) + + def add_ip_heap_size(self, value): + path = ['ip', 'heap-size'] + self.add_config_item(self._nodeconfig, value, path) + + def add_statseg_size(self, value): + path = ['statseg', 'size'] + self.add_config_item(self._nodeconfig, value, path) + + def add_plugin(self, state, *plugins): + for plugin in plugins: + path = ['plugins', 'plugin {0}'.format(plugin), state] + self.add_config_item(self._nodeconfig, ' ', path) + + def add_dpdk_no_multi_seg(self): + path = ['dpdk', 'no-multi-seg'] + self.add_config_item(self._nodeconfig, '', path) + + def add_dpdk_no_tx_checksum_offload(self): + path = ['dpdk', 'no-tx-checksum-offload'] + self.add_config_item(self._nodeconfig, '', path) + + def dump_config(self, obj=None, level=-1): + if obj is None: + obj = self._nodeconfig + obj = OrderedDict(sorted(obj.items())) + + indent = ' ' + if level >= 0: + self._vpp_config += '{}{{\n'.format(level * indent) + if isinstance(obj, dict): + for key, val in obj.items(): + if hasattr(val, '__iter__') and not isinstance(val, str): + self._vpp_config += '{}{}\n'.format((level + 1) * indent, + key) + self.dump_config(val, level + 1) + else: + self._vpp_config += '{}{} {}\n'.format( + (level + 1) * indent, + key, val) + if level >= 0: + self._vpp_config += '{}}}\n'.format(level * indent) + + return self._vpp_config + + @staticmethod + def pci_dev_check(pci_dev): + pattern = re.compile("^[0-9A-Fa-f]{4}:[0-9A-Fa-f]{2}:" + "[0-9A-Fa-f]{2}\\.[0-9A-Fa-f]$") + if not pattern.match(pci_dev): + raise ValueError('PCI address {addr} is not in valid format ' + 'xxxx:xx:xx.x'.format(addr=pci_dev)) + return True + + +class VppSetupEnvHelper(DpdkVnfSetupEnvHelper): + APP_NAME = "vpp" + CFG_CONFIG = "/etc/vpp/startup.conf" + CFG_SCRIPT = "" + PIPELINE_COMMAND = "" + QAT_DRIVER = "qat_dh895xcc" + VNF_TYPE = "IPSEC" + VAT_BIN_NAME = 'vpp_api_test' + + def __init__(self, vnfd_helper, ssh_helper, scenario_helper): + super(VppSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, + scenario_helper) + self.sys_cores = CpuSysCores(self.ssh_helper) + + def kill_vnf(self): + ret_code, _, _ = \ + self.ssh_helper.execute( + 'service {name} stop'.format(name=self.APP_NAME)) + if int(ret_code): + raise RuntimeError( + 'Failed to stop service {name}'.format(name=self.APP_NAME)) + + def tear_down(self): + pass + + def start_vpp_service(self): + ret_code, _, _ = \ + self.ssh_helper.execute( + 'service {name} restart'.format(name=self.APP_NAME)) + if int(ret_code): + raise RuntimeError( + 'Failed to start service {name}'.format(name=self.APP_NAME)) + + def _update_vnfd_helper(self, additional_data, iface_key=None): + for k, v in additional_data.items(): + if iface_key is None: + if isinstance(v, dict) and k in self.vnfd_helper: + self.vnfd_helper[k].update(v) + else: + self.vnfd_helper[k] = v + else: + if isinstance(v, + dict) and k in self.vnfd_helper.find_virtual_interface( + ifname=iface_key): + self.vnfd_helper.find_virtual_interface(ifname=iface_key)[ + k].update(v) + else: + self.vnfd_helper.find_virtual_interface(ifname=iface_key)[ + k] = v + + def get_value_by_interface_key(self, interface, key): + try: + return self.vnfd_helper.find_virtual_interface( + ifname=interface).get(key) + except (KeyError, ValueError): + return None + + def crypto_device_init(self, pci_addr, numvfs): + # QAT device must be re-bound to kernel driver before initialization. + self.dpdk_bind_helper.load_dpdk_driver(self.QAT_DRIVER) + + # Stop VPP to prevent deadlock. + self.kill_vnf() + + current_driver = self.get_pci_dev_driver(pci_addr.replace(':', r'\:')) + if current_driver is not None: + self.pci_driver_unbind(pci_addr) + + # Bind to kernel driver. + self.dpdk_bind_helper.bind(pci_addr, self.QAT_DRIVER.replace('qat_', '')) + + # Initialize QAT VFs. + if numvfs > 0: + self.set_sriov_numvfs(pci_addr, numvfs) + + def get_sriov_numvfs(self, pf_pci_addr): + command = 'cat /sys/bus/pci/devices/{pci}/sriov_numvfs'. \ + format(pci=pf_pci_addr.replace(':', r'\:')) + _, stdout, _ = self.ssh_helper.execute(command) + try: + return int(stdout) + except ValueError: + LOG.debug('Reading sriov_numvfs info failed') + return 0 + + def set_sriov_numvfs(self, pf_pci_addr, numvfs=0): + command = "sh -c 'echo {num} | tee /sys/bus/pci/devices/{pci}/sriov_numvfs'". \ + format(num=numvfs, pci=pf_pci_addr.replace(':', r'\:')) + self.ssh_helper.execute(command) + + def pci_driver_unbind(self, pci_addr): + command = "sh -c 'echo {pci} | tee /sys/bus/pci/devices/{pcie}/driver/unbind'". \ + format(pci=pci_addr, pcie=pci_addr.replace(':', r'\:')) + self.ssh_helper.execute(command) + + def get_pci_dev_driver(self, pci_addr): + cmd = 'lspci -vmmks {0}'.format(pci_addr) + ret_code, stdout, _ = self.ssh_helper.execute(cmd) + if int(ret_code): + raise RuntimeError("'{0}' failed".format(cmd)) + for line in stdout.splitlines(): + if not line: + continue + name = None + value = None + try: + name, value = line.split("\t", 1) + except ValueError: + if name == "Driver:": + return None + if name == 'Driver:': + return value + return None + + def vpp_create_ipsec_tunnels(self, if1_ip_addr, if2_ip_addr, if_name, + n_tunnels, n_connections, crypto_alg, + crypto_key, integ_alg, integ_key, addrs_ip, + spi_1=10000, spi_2=20000): + mask_length = 32 + if n_connections <= n_tunnels: + count = 1 + else: + count = int(n_connections / n_tunnels) + addr_ip_i = int(ipaddress.ip_address(str(addrs_ip))) + dst_start_ip = addr_ip_i + + tmp_fd, tmp_path = tempfile.mkstemp() + + vpp_ifname = self.get_value_by_interface_key(if_name, 'vpp_name') + ckey = binascii.hexlify(crypto_key.encode()) + ikey = binascii.hexlify(integ_key.encode()) + + integ = '' + if crypto_alg.alg_name != 'aes-gcm-128': + integ = 'integ_alg {integ_alg} ' \ + 'local_integ_key {local_integ_key} ' \ + 'remote_integ_key {remote_integ_key} ' \ + .format(integ_alg=integ_alg.alg_name, + local_integ_key=ikey, + remote_integ_key=ikey) + create_tunnels_cmds = 'ipsec_tunnel_if_add_del ' \ + 'local_spi {local_spi} ' \ + 'remote_spi {remote_spi} ' \ + 'crypto_alg {crypto_alg} ' \ + 'local_crypto_key {local_crypto_key} ' \ + 'remote_crypto_key {remote_crypto_key} ' \ + '{integ} ' \ + 'local_ip {local_ip} ' \ + 'remote_ip {remote_ip}\n' + start_tunnels_cmds = 'ip_add_del_route {raddr}/{mask} via {addr} ipsec{i}\n' \ + 'exec set interface unnumbered ipsec{i} use {uifc}\n' \ + 'sw_interface_set_flags ipsec{i} admin-up\n' + + with os.fdopen(tmp_fd, 'w') as tmp_file: + for i in range(0, n_tunnels): + create_tunnel = create_tunnels_cmds.format(local_spi=spi_1 + i, + remote_spi=spi_2 + i, + crypto_alg=crypto_alg.alg_name, + local_crypto_key=ckey, + remote_crypto_key=ckey, + integ=integ, + local_ip=if1_ip_addr, + remote_ip=if2_ip_addr) + tmp_file.write(create_tunnel) + self.execute_script(tmp_path, json_out=False, copy_on_execute=True) + os.remove(tmp_path) + + tmp_fd, tmp_path = tempfile.mkstemp() + + with os.fdopen(tmp_fd, 'w') as tmp_file: + for i in range(0, n_tunnels): + if count > 1: + dst_start_ip = addr_ip_i + i * count + dst_end_ip = ipaddress.ip_address(dst_start_ip + count - 1) + ips = [ipaddress.ip_address(ip) for ip in + [str(ipaddress.ip_address(dst_start_ip)), + str(dst_end_ip)]] + lowest_ip, highest_ip = min(ips), max(ips) + mask_length = self.get_prefix_length(int(lowest_ip), + int(highest_ip), + lowest_ip.max_prefixlen) + # TODO check duplicate route for some IPs + elif count == 1: + dst_start_ip = addr_ip_i + i + start_tunnel = start_tunnels_cmds.format( + raddr=str(ipaddress.ip_address(dst_start_ip)), + mask=mask_length, + addr=if2_ip_addr, + i=i, count=count, + uifc=vpp_ifname) + tmp_file.write(start_tunnel) + # TODO add route for remain IPs + + self.execute_script(tmp_path, json_out=False, copy_on_execute=True) + os.remove(tmp_path) + + def apply_config(self, vpp_cfg, restart_vpp=True): + vpp_config = vpp_cfg.dump_config() + ret, _, _ = \ + self.ssh_helper.execute('echo "{config}" | sudo tee {filename}'. + format(config=vpp_config, + filename=self.CFG_CONFIG)) + if ret != 0: + raise RuntimeError('Writing config file failed') + if restart_vpp: + self.start_vpp_service() + + def vpp_route_add(self, network, prefix_len, gateway=None, interface=None, + use_sw_index=True, resolve_attempts=10, + count=1, vrf=None, lookup_vrf=None, multipath=False, + weight=None, local=False): + if interface: + if use_sw_index: + int_cmd = ('sw_if_index {}'.format( + self.get_value_by_interface_key(interface, + 'vpp_sw_index'))) + else: + int_cmd = interface + else: + int_cmd = '' + + rap = 'resolve-attempts {}'.format(resolve_attempts) \ + if resolve_attempts else '' + + via = 'via {}'.format(gateway) if gateway else '' + + cnt = 'count {}'.format(count) \ + if count else '' + + vrf = 'vrf {}'.format(vrf) if vrf else '' + + lookup_vrf = 'lookup-in-vrf {}'.format( + lookup_vrf) if lookup_vrf else '' + + multipath = 'multipath' if multipath else '' + + weight = 'weight {}'.format(weight) if weight else '' + + local = 'local' if local else '' + + with VatTerminal(self.ssh_helper, json_param=False) as vat: + vat.vat_terminal_exec_cmd_from_template('add_route.vat', + network=network, + prefix_length=prefix_len, + via=via, + vrf=vrf, + interface=int_cmd, + resolve_attempts=rap, + count=cnt, + lookup_vrf=lookup_vrf, + multipath=multipath, + weight=weight, + local=local) + + def add_arp_on_dut(self, iface_key, ip_address, mac_address): + with VatTerminal(self.ssh_helper) as vat: + return vat.vat_terminal_exec_cmd_from_template( + 'add_ip_neighbor.vat', + sw_if_index=self.get_value_by_interface_key(iface_key, + 'vpp_sw_index'), + ip_address=ip_address, mac_address=mac_address) + + def set_ip(self, interface, address, prefix_length): + with VatTerminal(self.ssh_helper) as vat: + return vat.vat_terminal_exec_cmd_from_template( + 'add_ip_address.vat', + sw_if_index=self.get_value_by_interface_key(interface, + 'vpp_sw_index'), + address=address, prefix_length=prefix_length) + + def set_interface_state(self, interface, state): + sw_if_index = self.get_value_by_interface_key(interface, + 'vpp_sw_index') + + if state == 'up': + state = 'admin-up link-up' + elif state == 'down': + state = 'admin-down link-down' + else: + raise ValueError('Unexpected interface state: {}'.format(state)) + with VatTerminal(self.ssh_helper) as vat: + return vat.vat_terminal_exec_cmd_from_template( + 'set_if_state.vat', sw_if_index=sw_if_index, state=state) + + def vpp_set_interface_mtu(self, interface, mtu=9200): + sw_if_index = self.get_value_by_interface_key(interface, + 'vpp_sw_index') + if sw_if_index: + with VatTerminal(self.ssh_helper, json_param=False) as vat: + vat.vat_terminal_exec_cmd_from_template( + "hw_interface_set_mtu.vat", sw_if_index=sw_if_index, + mtu=mtu) + + def vpp_interfaces_ready_wait(self, timeout=30): + if_ready = False + not_ready = [] + start = time.time() + while not if_ready: + out = self.vpp_get_interface_data() + if time.time() - start > timeout: + for interface in out: + if interface.get('admin_up_down') == 1: + if interface.get('link_up_down') != 1: + LOG.debug('%s link-down', + interface.get('interface_name')) + raise RuntimeError('timeout, not up {0}'.format(not_ready)) + not_ready = [] + for interface in out: + if interface.get('admin_up_down') == 1: + if interface.get('link_up_down') != 1: + not_ready.append(interface.get('interface_name')) + if not not_ready: + if_ready = True + else: + LOG.debug('Interfaces still in link-down state: %s, ' + 'waiting...', not_ready) + time.sleep(1) + + def vpp_get_interface_data(self, interface=None): + with VatTerminal(self.ssh_helper) as vat: + response = vat.vat_terminal_exec_cmd_from_template( + "interface_dump.vat") + data = response[0] + if interface is not None: + if isinstance(interface, str): + param = "interface_name" + elif isinstance(interface, int): + param = "sw_if_index" + else: + raise TypeError + for data_if in data: + if data_if[param] == interface: + return data_if + return dict() + return data + + def update_vpp_interface_data(self): + data = {} + interface_dump_json = self.execute_script_json_out( + "dump_interfaces.vat") + interface_list = json.loads(interface_dump_json) + for interface in self.vnfd_helper.interfaces: + if_mac = interface['virtual-interface']['local_mac'] + interface_dict = VppSetupEnvHelper.get_vpp_interface_by_mac( + interface_list, if_mac) + if not interface_dict: + LOG.debug('Interface %s not found by MAC %s', interface, + if_mac) + continue + data[interface['virtual-interface']['ifname']] = { + 'vpp_name': interface_dict["interface_name"], + 'vpp_sw_index': interface_dict["sw_if_index"] + } + for iface_key, updated_vnfd in data.items(): + self._update_vnfd_helper(updated_vnfd, iface_key) + + def iface_update_numa(self): + iface_numa = {} + for interface in self.vnfd_helper.interfaces: + cmd = "cat /sys/bus/pci/devices/{}/numa_node".format( + interface["virtual-interface"]["vpci"]) + ret, out, _ = self.ssh_helper.execute(cmd) + if ret == 0: + try: + numa_node = int(out) + if numa_node < 0: + if self.vnfd_helper["cpuinfo"][-1][3] + 1 == 1: + iface_numa[ + interface['virtual-interface']['ifname']] = { + 'numa_node': 0 + } + else: + raise ValueError + else: + iface_numa[ + interface['virtual-interface']['ifname']] = { + 'numa_node': numa_node + } + except ValueError: + LOG.debug( + 'Reading numa location failed for: %s', + interface["virtual-interface"]["vpci"]) + for iface_key, updated_vnfd in iface_numa.items(): + self._update_vnfd_helper(updated_vnfd, iface_key) + + def execute_script(self, vat_name, json_out=True, copy_on_execute=False): + if copy_on_execute: + self.ssh_helper.put_file(vat_name, vat_name) + remote_file_path = vat_name + else: + vat_path = self.ssh_helper.join_bin_path("vpp", "templates") + remote_file_path = '{0}/{1}'.format(vat_path, vat_name) + + cmd = "{vat_bin} {json} in {vat_path} script".format( + vat_bin=self.VAT_BIN_NAME, + json="json" if json_out is True else "", + vat_path=remote_file_path) + + try: + return self.ssh_helper.execute(cmd=cmd) + except Exception: + raise RuntimeError("VAT script execution failed: {0}".format(cmd)) + + def execute_script_json_out(self, vat_name): + vat_path = self.ssh_helper.join_bin_path("vpp", "templates") + remote_file_path = '{0}/{1}'.format(vat_path, vat_name) + + _, stdout, _ = self.execute_script(vat_name, json_out=True) + return self.cleanup_vat_json_output(stdout, vat_file=remote_file_path) + + @staticmethod + def cleanup_vat_json_output(json_output, vat_file=None): + retval = json_output + clutter = ['vat#', 'dump_interface_table error: Misc', + 'dump_interface_table:6019: JSON output supported only ' \ + 'for VPE API calls and dump_stats_table'] + if vat_file: + clutter.append("{0}(2):".format(vat_file)) + for garbage in clutter: + retval = retval.replace(garbage, '') + return retval.strip() + + @staticmethod + def _convert_mac_to_number_list(mac_address): + list_mac = [] + for num in mac_address.split(":"): + list_mac.append(int(num, 16)) + return list_mac + + @staticmethod + def get_vpp_interface_by_mac(interfaces_list, mac_address): + interface_dict = {} + list_mac_address = VppSetupEnvHelper._convert_mac_to_number_list( + mac_address) + LOG.debug("MAC address %s converted to list %s.", mac_address, + list_mac_address) + for interface in interfaces_list: + # TODO: create vat json integrity checking and move there + if "l2_address" not in interface: + raise KeyError( + "key l2_address not found in interface dict." + "Probably input list is not parsed from correct VAT " + "json output.") + if "l2_address_length" not in interface: + raise KeyError( + "key l2_address_length not found in interface " + "dict. Probably input list is not parsed from correct " + "VAT json output.") + mac_from_json = interface["l2_address"][:6] + if mac_from_json == list_mac_address: + if interface["l2_address_length"] != 6: + raise ValueError("l2_address_length value is not 6.") + interface_dict = interface + break + return interface_dict + + @staticmethod + def get_prefix_length(number1, number2, bits): + for i in range(bits): + if number1 >> i == number2 >> i: + return bits - i + return 0 + + +class VatTerminal(object): + + __VAT_PROMPT = ("vat# ",) + __LINUX_PROMPT = (":~# ", ":~$ ", "~]$ ", "~]# ") + + + def __init__(self, ssh_helper, json_param=True): + json_text = ' json' if json_param else '' + self.json = json_param + self.ssh_helper = ssh_helper + EXEC_RETRY = 3 + + try: + self._tty = self.ssh_helper.interactive_terminal_open() + except Exception: + raise RuntimeError("Cannot open interactive terminal") + + for _ in range(EXEC_RETRY): + try: + self.ssh_helper.interactive_terminal_exec_command( + self._tty, + 'sudo -S {0}{1}'.format(VppSetupEnvHelper.VAT_BIN_NAME, + json_text), + self.__VAT_PROMPT) + except exceptions.SSHTimeout: + continue + else: + break + + self._exec_failure = False + self.vat_stdout = None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.vat_terminal_close() + + def vat_terminal_exec_cmd(self, cmd): + try: + out = self.ssh_helper.interactive_terminal_exec_command(self._tty, + cmd, + self.__VAT_PROMPT) + self.vat_stdout = out + except exceptions.SSHTimeout: + self._exec_failure = True + raise RuntimeError( + "VPP is not running on node. VAT command {0} execution failed". + format(cmd)) + if self.json: + obj_start = out.find('{') + obj_end = out.rfind('}') + array_start = out.find('[') + array_end = out.rfind(']') + + if obj_start == -1 and array_start == -1: + raise RuntimeError( + "VAT command {0}: no JSON data.".format(cmd)) + + if obj_start < array_start or array_start == -1: + start = obj_start + end = obj_end + 1 + else: + start = array_start + end = array_end + 1 + out = out[start:end] + json_out = json.loads(out) + return json_out + else: + return None + + def vat_terminal_close(self): + if not self._exec_failure: + try: + self.ssh_helper.interactive_terminal_exec_command(self._tty, + 'quit', + self.__LINUX_PROMPT) + except exceptions.SSHTimeout: + raise RuntimeError("Failed to close VAT console") + try: + self.ssh_helper.interactive_terminal_close(self._tty) + except Exception: + raise RuntimeError("Cannot close interactive terminal") + + def vat_terminal_exec_cmd_from_template(self, vat_template_file, **args): + file_path = os.path.join(constants.YARDSTICK_ROOT_PATH, + 'yardstick/resources/templates/', + vat_template_file) + with open(file_path, 'r') as template_file: + cmd_template = template_file.readlines() + ret = [] + for line_tmpl in cmd_template: + vat_cmd = line_tmpl.format(**args) + ret.append(self.vat_terminal_exec_cmd(vat_cmd.replace('\n', ''))) + return ret |