diff options
Diffstat (limited to 'yardstick/network_services/vnf_generic/vnf/sample_vnf.py')
-rw-r--r-- | yardstick/network_services/vnf_generic/vnf/sample_vnf.py | 395 |
1 files changed, 265 insertions, 130 deletions
diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 77488c479..a369a3ae6 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2018 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,19 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -""" Base class implementation for generic vnf implementation """ -from collections import Mapping import logging -from multiprocessing import Queue, Value, Process - +import decimal +from multiprocessing import Queue, Value, Process, JoinableQueue import os import posixpath import re import subprocess import time -import six from trex_stl_lib.trex_stl_client import LoggerApi from trex_stl_lib.trex_stl_client import STLClient @@ -32,17 +29,17 @@ from yardstick.benchmark.contexts.base import Context from yardstick.common import exceptions as y_exceptions from yardstick.common.process import check_if_process_failed from yardstick.common import utils +from yardstick.common import yaml_loader from yardstick.network_services import constants from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig -from yardstick.network_services.helpers.samplevnf_helper import PortPairs from yardstick.network_services.nfvi.resource import ResourceProfile from yardstick.network_services.utils import get_nsb_option from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen from yardstick.network_services.vnf_generic.vnf.base import GenericVNF from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper - +from yardstick.benchmark.contexts.node import NodeContext LOG = logging.getLogger(__name__) @@ -60,6 +57,7 @@ class SetupEnvHelper(object): self.vnfd_helper = vnfd_helper self.ssh_helper = ssh_helper self.scenario_helper = scenario_helper + self.collectd_options = {} def build_config(self): raise NotImplementedError @@ -114,19 +112,6 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): self.used_drivers = None self.dpdk_bind_helper = DpdkBindHelper(ssh_helper) - def _setup_hugepages(self): - meminfo = utils.read_meminfo(self.ssh_helper) - hp_size_kb = int(meminfo['Hugepagesize']) - hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16) - nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb)) - self.ssh_helper.execute('echo %s | sudo tee %s' % - (nr_hugepages, self.NR_HUGEPAGES_PATH)) - hp = six.BytesIO() - self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp) - nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0]) - LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s', - hp_size_kb, nr_hugepages, nr_hugepages_set) - def build_config(self): vnf_cfg = self.scenario_helper.vnf_cfg task_path = self.scenario_helper.task_path @@ -144,6 +129,13 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): 'vnf_type': self.VNF_TYPE, } + # read actions/rules from file + acl_options = None + acl_file_name = self.scenario_helper.options.get('rules') + if acl_file_name: + with utils.open_relative_file(acl_file_name, task_path) as infile: + acl_options = yaml_loader.yaml_load(infile) + config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path) config_basename = posixpath.basename(self.CFG_CONFIG) @@ -176,13 +168,18 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): new_config = self._update_packet_type(new_config, traffic_options) self.ssh_helper.upload_config_file(config_basename, new_config) self.ssh_helper.upload_config_file(script_basename, - multiport.generate_script(self.vnfd_helper)) + multiport.generate_script(self.vnfd_helper, + self.get_flows_config(acl_options))) LOG.info("Provision and start the %s", self.APP_NAME) self._build_pipeline_kwargs() return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs) - def _build_pipeline_kwargs(self): + def get_flows_config(self, options=None): # pylint: disable=unused-argument + """No actions/rules (flows) by default""" + return None + + def _build_pipeline_kwargs(self, cfg_file=None, script=None): tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME) # count the number of actual ports in the list of pairs # remove duplicate ports @@ -193,11 +190,20 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): port_nums = self.vnfd_helper.port_nums(ports) # create mask from all the dpdk port numbers ports_mask_hex = hex(sum(2 ** num for num in port_nums)) + + vnf_cfg = self.scenario_helper.vnf_cfg + lb_config = vnf_cfg.get('lb_config', 'SW') + worker_threads = vnf_cfg.get('worker_threads', 3) + hwlb = '' + if lb_config == 'HW': + hwlb = ' --hwlb %s' % worker_threads + self.pipeline_kwargs = { - 'cfg_file': self.CFG_CONFIG, - 'script': self.CFG_SCRIPT, + 'cfg_file': cfg_file if cfg_file else self.CFG_CONFIG, + 'script': script if script else self.CFG_SCRIPT, 'port_mask_hex': ports_mask_hex, 'tool_path': tool_path, + 'hwlb': hwlb, } def setup_vnf_environment(self): @@ -218,18 +224,16 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): def _setup_dpdk(self): """Setup DPDK environment needed for VNF to run""" - self._setup_hugepages() + hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16) + utils.setup_hugepages(self.ssh_helper, hugepages_gb * 1024 * 1024) self.dpdk_bind_helper.load_dpdk_driver() exit_status = self.dpdk_bind_helper.check_dpdk_driver() if exit_status == 0: return - - def get_collectd_options(self): - options = self.scenario_helper.all_options.get("collectd", {}) - # override with specific node settings - options.update(self.scenario_helper.options.get("collectd", {})) - return options + else: + LOG.critical("DPDK Driver not installed") + return def _setup_resources(self): # what is this magic? how do we know which socket is for which port? @@ -243,11 +247,11 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): # this won't work because we don't have DPDK port numbers yet ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num) port_names = (intf["name"] for intf in ports) - collectd_options = self.get_collectd_options() - plugins = collectd_options.get("plugins", {}) + plugins = self.collectd_options.get("plugins", {}) + interval = self.collectd_options.get("interval") # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names, - plugins=plugins, interval=collectd_options.get("interval"), + plugins=plugins, interval=interval, timeout=self.scenario_helper.timeout) def _check_interface_fields(self): @@ -306,6 +310,7 @@ class ResourceHelper(object): self.resource = None self.setup_helper = setup_helper self.ssh_helper = setup_helper.ssh_helper + self._enable = True def setup(self): self.resource = self.setup_helper.setup_vnf_environment() @@ -313,22 +318,33 @@ class ResourceHelper(object): def generate_cfg(self): pass + def update_from_context(self, context, attr_name): + """Disable resource helper in case of baremetal context. + + And update appropriate node collectd options in context + """ + if isinstance(context, NodeContext): + self._enable = False + context.update_collectd_options_for_node(self.setup_helper.collectd_options, + attr_name) + def _collect_resource_kpi(self): result = {} status = self.resource.check_if_system_agent_running("collectd")[0] - if status == 0: + if status == 0 and self._enable: result = self.resource.amqp_collect_nfvi_kpi() result = {"core": result} return result def start_collect(self): - self.resource.initiate_systemagent(self.ssh_helper.bin_path) - self.resource.start() - self.resource.amqp_process_for_nfvi_kpi() + if self._enable: + self.resource.initiate_systemagent(self.ssh_helper.bin_path) + self.resource.start() + self.resource.amqp_process_for_nfvi_kpi() def stop_collect(self): - if self.resource: + if self.resource and self._enable: self.resource.stop() def collect_kpi(self): @@ -372,39 +388,14 @@ class ClientResourceHelper(ResourceHelper): LOG.error('TRex client not connected') return {} - def generate_samples(self, ports, key=None, default=None): - # needs to be used ports - last_result = self.get_stats(ports) - key_value = last_result.get(key, default) - - if not isinstance(last_result, Mapping): # added for mock unit test - self._terminated.value = 1 - return {} - - samples = {} - # recalculate port for interface and see if it matches ports provided - for intf in self.vnfd_helper.interfaces: - name = intf["name"] - port = self.vnfd_helper.port_num(name) - if port in ports: - xe_value = last_result.get(port, {}) - samples[name] = { - "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)), - "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)), - "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)), - "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)), - "in_packets": int(xe_value.get("ipackets", 0)), - "out_packets": int(xe_value.get("opackets", 0)), - } - if key: - samples[name][key] = key_value - return samples + def _get_samples(self, ports, port_pg_id=False): + raise NotImplementedError() def _run_traffic_once(self, traffic_profile): traffic_profile.execute_traffic(self) self.client_started.value = 1 time.sleep(self.RUN_DURATION) - samples = self.generate_samples(traffic_profile.ports) + samples = self._get_samples(traffic_profile.ports) time.sleep(self.QUEUE_WAIT_TIME) self._queue.put(samples) @@ -417,12 +408,17 @@ class ClientResourceHelper(ResourceHelper): try: self._build_ports() self.client = self._connect() + if self.client is None: + LOG.critical("Failure to Connect ... unable to continue") + return + self.client.reset(ports=self.all_ports) self.client.remove_all_streams(self.all_ports) # remove all streams traffic_profile.register_generator(self) while self._terminated.value == 0: - self._run_traffic_once(traffic_profile) + if self._run_traffic_once(traffic_profile): + self._terminated.value = 1 self.client.stop(self.all_ports) self.client.disconnect() @@ -465,22 +461,35 @@ class ClientResourceHelper(ResourceHelper): server=self.vnfd_helper.mgmt_interface["ip"], verbose_level=LoggerApi.VERBOSE_QUIET) - # try to connect with 5s intervals, 30s max + # try to connect with 5s intervals for idx in range(6): try: client.connect() - break + for idx2 in range(6): + if client.is_connected(): + return client + LOG.info("Waiting to confirm connection %s .. Attempt %s", + idx, idx2) + time.sleep(1) + client.disconnect(stop_traffic=True, release_ports=True) except STLError: LOG.info("Unable to connect to Trex Server.. Attempt %s", idx) time.sleep(5) - return client + if client.is_connected(): + return client + else: + LOG.critical("Connection failure ..TRex username: %s server: %s", + self.vnfd_helper.mgmt_interface["user"], + self.vnfd_helper.mgmt_interface["ip"]) + return None class Rfc2544ResourceHelper(object): DEFAULT_CORRELATED_TRAFFIC = False DEFAULT_LATENCY = False DEFAULT_TOLERANCE = '0.0001 - 0.0001' + DEFAULT_RESOLUTION = '0.1' def __init__(self, scenario_helper): super(Rfc2544ResourceHelper, self).__init__() @@ -491,6 +500,8 @@ class Rfc2544ResourceHelper(object): self._rfc2544 = None self._tolerance_low = None self._tolerance_high = None + self._tolerance_precision = None + self._resolution = None @property def rfc2544(self): @@ -511,6 +522,12 @@ class Rfc2544ResourceHelper(object): return self._tolerance_high @property + def tolerance_precision(self): + if self._tolerance_precision is None: + self.get_rfc_tolerance() + return self._tolerance_precision + + @property def correlated_traffic(self): if self._correlated_traffic is None: self._correlated_traffic = \ @@ -524,14 +541,25 @@ class Rfc2544ResourceHelper(object): self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY) return self._latency + @property + def resolution(self): + if self._resolution is None: + self._resolution = float(self.get_rfc2544('resolution', + self.DEFAULT_RESOLUTION)) + return self._resolution + def get_rfc2544(self, name, default=None): return self.rfc2544.get(name, default) def get_rfc_tolerance(self): tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE) - tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-'))) - self._tolerance_low = next(tolerance_iter) - self._tolerance_high = next(tolerance_iter, self.tolerance_low) + tolerance_iter = iter(sorted( + decimal.Decimal(t.strip()) for t in tolerance_str.split('-'))) + tolerance_low = next(tolerance_iter) + tolerance_high = next(tolerance_iter, tolerance_low) + self._tolerance_precision = abs(tolerance_high.as_tuple().exponent) + self._tolerance_high = float(tolerance_high) + self._tolerance_low = float(tolerance_low) class SampleVNFDeployHelper(object): @@ -643,7 +671,6 @@ class SampleVNF(GenericVNF): self.resource_helper = resource_helper_type(self.setup_helper) self.context_cfg = None - self.nfvi_context = None self.pipeline_kwargs = {} self.uplink_ports = None self.downlink_ports = None @@ -657,49 +684,6 @@ class SampleVNF(GenericVNF): self.vnf_port_pairs = None self._vnf_process = None - def _build_ports(self): - self._port_pairs = PortPairs(self.vnfd_helper.interfaces) - self.networks = self._port_pairs.networks - self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports) - self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports) - self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports) - - def _get_route_data(self, route_index, route_type): - route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', [])) - for _ in range(route_index): - next(route_iter, '') - return next(route_iter, {}).get(route_type, '') - - def _get_port0localip6(self): - return_value = self._get_route_data(0, 'network') - LOG.info("_get_port0localip6 : %s", return_value) - return return_value - - def _get_port1localip6(self): - return_value = self._get_route_data(1, 'network') - LOG.info("_get_port1localip6 : %s", return_value) - return return_value - - def _get_port0prefixlen6(self): - return_value = self._get_route_data(0, 'netmask') - LOG.info("_get_port0prefixlen6 : %s", return_value) - return return_value - - def _get_port1prefixlen6(self): - return_value = self._get_route_data(1, 'netmask') - LOG.info("_get_port1prefixlen6 : %s", return_value) - return return_value - - def _get_port0gateway6(self): - return_value = self._get_route_data(0, 'network') - LOG.info("_get_port0gateway6 : %s", return_value) - return return_value - - def _get_port1gateway6(self): - return_value = self._get_route_data(1, 'network') - LOG.info("_get_port1gateway6 : %s", return_value) - return return_value - def _start_vnf(self): self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT) name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid()) @@ -710,10 +694,13 @@ class SampleVNF(GenericVNF): pass def instantiate(self, scenario_cfg, context_cfg): + self._update_collectd_options(scenario_cfg, context_cfg) self.scenario_helper.scenario_cfg = scenario_cfg self.context_cfg = context_cfg - self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name]) - # self.nfvi_context = None + self.resource_helper.update_from_context( + Context.get_context_from_server(self.scenario_helper.nodes[self.name]), + self.scenario_helper.nodes[self.name] + ) # vnf deploy is unsupported, use ansible playbooks if self.scenario_helper.options.get("vnf_deploy", False): @@ -721,6 +708,54 @@ class SampleVNF(GenericVNF): self.resource_helper.setup() self._start_vnf() + def _update_collectd_options(self, scenario_cfg, context_cfg): + """Update collectd configuration options + This function retrieves all collectd options contained in the test case + + definition builds a single dictionary combining them. The following fragment + represents a test case with the collectd options and priorities (1 highest, 3 lowest): + --- + schema: yardstick:task:0.1 + scenarios: + - type: NSPerf + nodes: + tg__0: trafficgen_0.yardstick + vnf__0: vnf_0.yardstick + options: + collectd: + <options> # COLLECTD priority 3 + vnf__0: + collectd: + plugins: + load + <options> # COLLECTD priority 2 + context: + type: Node + name: yardstick + nfvi_type: baremetal + file: /etc/yardstick/nodes/pod_ixia.yaml # COLLECTD priority 1 + """ + scenario_options = scenario_cfg.get('options', {}) + generic_options = scenario_options.get('collectd', {}) + scenario_node_options = scenario_options.get(self.name, {})\ + .get('collectd', {}) + context_node_options = context_cfg.get('nodes', {})\ + .get(self.name, {}).get('collectd', {}) + + options = generic_options + self._update_options(options, scenario_node_options) + self._update_options(options, context_node_options) + + self.setup_helper.collectd_options = options + + def _update_options(self, options, additional_options): + """Update collectd options and plugins dictionary""" + for k, v in additional_options.items(): + if isinstance(v, dict) and k in options: + options[k].update(v) + else: + options[k] = v + def wait_for_instantiate(self): buf = [] time.sleep(self.WAIT_TIME) # Give some time for config to load @@ -736,7 +771,6 @@ class SampleVNF(GenericVNF): LOG.info("%s VNF is up and running.", self.APP_NAME) self._vnf_up_post() self.queue_wrapper.clear() - self.resource_helper.start_collect() return self._vnf_process.exitcode if "PANIC" in message: @@ -749,6 +783,59 @@ class SampleVNF(GenericVNF): # by other VNF output self.q_in.put('\r\n') + def wait_for_initialize(self): + buf = [] + vnf_prompt_found = False + prompt_command = '\r\n' + script_name = 'non_existent_script_name' + done_string = 'Cannot open file "{}"'.format(script_name) + time.sleep(self.WAIT_TIME) # Give some time for config to load + while True: + if not self._vnf_process.is_alive(): + raise RuntimeError("%s VNF process died." % self.APP_NAME) + while self.q_out.qsize() > 0: + buf.append(self.q_out.get()) + message = ''.join(buf) + + if self.VNF_PROMPT in message and not vnf_prompt_found: + # Once we got VNF promt, it doesn't mean that the VNF is + # up and running/initialized completely. But we can run + # addition (any) VNF command and wait for it to complete + # as it will be finished ONLY at the end of the VNF + # initialization. So, this approach can be used to + # indentify that VNF is completely initialized. + LOG.info("Got %s VNF prompt.", self.APP_NAME) + prompt_command = "run {}\r\n".format(script_name) + self.q_in.put(prompt_command) + # Cut the buffer since we are not interesting to find + # the VNF prompt anymore + prompt_pos = message.find(self.VNF_PROMPT) + buf = [message[prompt_pos + len(self.VNF_PROMPT):]] + vnf_prompt_found = True + continue + + if done_string in message: + LOG.info("%s VNF is up and running.", self.APP_NAME) + self._vnf_up_post() + self.queue_wrapper.clear() + return self._vnf_process.exitcode + + if "PANIC" in message: + raise RuntimeError("Error starting %s VNF." % + self.APP_NAME) + + LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME) + time.sleep(self.WAIT_TIME_FOR_SCRIPT) + # Send command again to display the expected prompt in case the + # expected text was corrupted by other VNF output + self.q_in.put(prompt_command) + + def start_collect(self): + self.resource_helper.start_collect() + + def stop_collect(self): + self.resource_helper.stop_collect() + def _build_run_kwargs(self): self.run_kwargs = { 'stdin': self.queue_wrapper, @@ -811,18 +898,21 @@ class SampleVNF(GenericVNF): def collect_kpi(self): # we can't get KPIs if the VNF is down - check_if_process_failed(self._vnf_process) + check_if_process_failed(self._vnf_process, 0.01) stats = self.get_stats() m = re.search(self.COLLECT_KPI, stats, re.MULTILINE) + physical_node = Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + + result = {"physical_node": physical_node} if m: - result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()} + result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}) result["collect_stats"] = self.resource_helper.collect_kpi() else: - result = { - "packets_in": 0, - "packets_fwd": 0, - "packets_dropped": 0, - } + result.update({"packets_in": 0, + "packets_fwd": 0, + "packets_dropped": 0}) + LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result @@ -861,6 +951,39 @@ class SampleVNFTrafficGen(GenericTrafficGen): self.traffic_finished = False self._tg_process = None self._traffic_process = None + self._tasks_queue = JoinableQueue() + self._result_queue = Queue() + + def _test_runner(self, traffic_profile, tasks, results): + self.resource_helper.run_test(traffic_profile, tasks, results) + + def _init_traffic_process(self, traffic_profile): + name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME, + traffic_profile.__class__.__name__, + os.getpid()) + self._traffic_process = Process(name=name, target=self._test_runner, + args=( + traffic_profile, self._tasks_queue, + self._result_queue)) + + self._traffic_process.start() + while self.resource_helper.client_started.value == 0: + time.sleep(1) + if not self._traffic_process.is_alive(): + break + + def run_traffic_once(self, traffic_profile): + if self.resource_helper.client_started.value == 0: + self._init_traffic_process(traffic_profile) + + # continue test - run next iteration + LOG.info("Run next iteration ...") + self._tasks_queue.put('RUN_TRAFFIC') + + def wait_on_traffic(self): + self._tasks_queue.join() + result = self._result_queue.get() + return result def _start_server(self): # we can't share ssh paramiko objects to force new connection @@ -868,6 +991,13 @@ class SampleVNFTrafficGen(GenericTrafficGen): def instantiate(self, scenario_cfg, context_cfg): self.scenario_helper.scenario_cfg = scenario_cfg + self.resource_helper.update_from_context( + Context.get_context_from_server(self.scenario_helper.nodes[self.name]), + self.scenario_helper.nodes[self.name] + ) + + self.resource_helper.context_cfg = context_cfg + self.resource_helper.setup() # must generate_cfg after DPDK bind because we need port number self.resource_helper.generate_cfg() @@ -922,9 +1052,14 @@ class SampleVNFTrafficGen(GenericTrafficGen): def collect_kpi(self): # check if the tg processes have exited + physical_node = Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + + result = {"physical_node": physical_node} for proc in (self._tg_process, self._traffic_process): check_if_process_failed(proc) - result = self.resource_helper.collect_kpi() + + result["collect_stats"] = self.resource_helper.collect_kpi() LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result |