diff options
author | Ross Brattain <ross.b.brattain@intel.com> | 2017-10-03 22:00:55 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@opnfv.org> | 2017-10-03 22:00:55 +0000 |
commit | add9dfa6f8739ec3328388c2658750e185e6a3ed (patch) | |
tree | ef6770a0b7adf6c3faa899770d3812cf266b1eb7 /yardstick | |
parent | eb5b2735fe9907d38fa8f64df1efbe979158d7ec (diff) | |
parent | e228c2a3ac5b0173792fa7b11f9540ecec3a0029 (diff) |
Merge "NSB PROX test hang fixes"
Diffstat (limited to 'yardstick')
18 files changed, 251 insertions, 72 deletions
diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py index 3ff064ae1..6aaaed888 100755 --- a/yardstick/benchmark/runners/arithmetic.py +++ b/yardstick/benchmark/runners/arithmetic.py @@ -191,7 +191,9 @@ class ArithmeticRunner(base.Runner): __execution_type__ = 'Arithmetic' def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, context_cfg, self.aborted, self.output_queue)) diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index 3ecf67736..13718d793 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -17,13 +17,14 @@ # rally/rally/benchmark/runners/base.py from __future__ import absolute_import -import importlib + import logging import multiprocessing import subprocess import time import traceback +import importlib from six.moves.queue import Empty @@ -36,7 +37,6 @@ log = logging.getLogger(__name__) def _execute_shell_command(command): """execute shell script with error handling""" exitcode = 0 - output = [] try: output = subprocess.check_output(command, shell=True) except Exception: diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py index 2cb2600c8..fbf72a74c 100644 --- a/yardstick/benchmark/runners/duration.py +++ b/yardstick/benchmark/runners/duration.py @@ -138,7 +138,9 @@ If the scenario ends before the time has elapsed, it will be started again. __execution_type__ = 'Duration' def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, context_cfg, self.aborted, self.output_queue)) diff --git a/yardstick/benchmark/runners/dynamictp.py b/yardstick/benchmark/runners/dynamictp.py index 01e76c6f4..63bfc823a 100755 --- a/yardstick/benchmark/runners/dynamictp.py +++ b/yardstick/benchmark/runners/dynamictp.py @@ -19,11 +19,12 @@ """A runner that searches for the max throughput with binary search """ -import os -import multiprocessing import logging -import traceback +import multiprocessing import time +import traceback + +import os from yardstick.benchmark.runners import base @@ -65,8 +66,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, max_throuput_found = False sequence = 0 - last_min_data = {} - last_min_data['packets_per_second'] = 0 + last_min_data = {'packets_per_second': 0} while True: sequence += 1 @@ -125,7 +125,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, queue.put(record) max_throuput_found = True - if (errors) or aborted.is_set() or max_throuput_found: + if errors or aborted.is_set() or max_throuput_found: LOG.info("worker END") break @@ -155,7 +155,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, class IterationRunner(base.Runner): - '''Run a scenario to find the max throughput + """Run a scenario to find the max throughput If the scenario ends before the time has elapsed, it will be started again. @@ -168,11 +168,13 @@ If the scenario ends before the time has elapsed, it will be started again. type: int unit: pps default: 1000 pps - ''' + """ __execution_type__ = 'Dynamictp' def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, context_cfg, self.aborted)) diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py index 88158eed3..cb0424377 100644 --- a/yardstick/benchmark/runners/iteration.py +++ b/yardstick/benchmark/runners/iteration.py @@ -20,11 +20,13 @@ """ from __future__ import absolute_import -import os -import multiprocessing + import logging -import traceback +import multiprocessing import time +import traceback + +import os from yardstick.benchmark.runners import base @@ -88,9 +90,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg, scenario_cfg['options']['rate'] -= delta sequence = 1 continue - except Exception as e: + except Exception: errors = traceback.format_exc() - LOG.exception(e) + LOG.exception("") else: if result: # add timeout for put so we don't block test @@ -151,7 +153,9 @@ If the scenario ends before the time has elapsed, it will be started again. __execution_type__ = 'Iteration' def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, context_cfg, self.aborted, self.output_queue)) diff --git a/yardstick/benchmark/runners/search.py b/yardstick/benchmark/runners/search.py index 5948763a7..8037329b5 100644 --- a/yardstick/benchmark/runners/search.py +++ b/yardstick/benchmark/runners/search.py @@ -20,15 +20,16 @@ """ from __future__ import absolute_import -import os -import multiprocessing + import logging -import traceback +import multiprocessing import time - -from collections import Mapping +import traceback from contextlib import contextmanager from itertools import takewhile + +import os +from collections import Mapping from six.moves import zip from yardstick.benchmark.runners import base @@ -173,7 +174,9 @@ If the scenario ends before the time has elapsed, it will be started again. break def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=self._worker_run, args=(cls, method, scenario_cfg, context_cfg)) self.process.start() diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py index f08ca5dde..d6e3f7109 100644 --- a/yardstick/benchmark/runners/sequence.py +++ b/yardstick/benchmark/runners/sequence.py @@ -21,11 +21,13 @@ The input value in the sequence is specified in a list in the input file. """ from __future__ import absolute_import -import os -import multiprocessing + import logging -import traceback +import multiprocessing import time +import traceback + +import os from yardstick.benchmark.runners import base @@ -140,7 +142,9 @@ class SequenceRunner(base.Runner): __execution_type__ = 'Sequence' def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, context_cfg, self.aborted, self.output_queue)) diff --git a/yardstick/benchmark/scenarios/networking/vnf_generic.py b/yardstick/benchmark/scenarios/networking/vnf_generic.py index d9cc0eac1..d85125230 100644 --- a/yardstick/benchmark/scenarios/networking/vnf_generic.py +++ b/yardstick/benchmark/scenarios/networking/vnf_generic.py @@ -30,6 +30,7 @@ from collections import defaultdict from yardstick.benchmark.scenarios import base from yardstick.common.constants import LOG_DIR +from yardstick.common.process import terminate_children from yardstick.common.utils import import_modules_from_package, itersubclasses from yardstick.common.yaml_loader import yaml_load from yardstick.network_services.collector.subscriber import Collector @@ -596,7 +597,8 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \ vnf.instantiate(self.scenario_cfg, self.context_cfg) LOG.info("Waiting for %s to instantiate", vnf.name) vnf.wait_for_instantiate() - except RuntimeError: + except: + LOG.exception("") for vnf in self.vnfs: vnf.terminate() raise @@ -635,7 +637,19 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \ :return """ - self.collector.stop() - for vnf in self.vnfs: - LOG.info("Stopping %s", vnf.name) - vnf.terminate() + try: + try: + self.collector.stop() + for vnf in self.vnfs: + LOG.info("Stopping %s", vnf.name) + vnf.terminate() + LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs)) + finally: + terminate_children() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise RuntimeError("Error in teardown") diff --git a/yardstick/common/process.py b/yardstick/common/process.py new file mode 100644 index 000000000..812ddea94 --- /dev/null +++ b/yardstick/common/process.py @@ -0,0 +1,47 @@ +# Copyright (c) 2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import multiprocessing + +import os + +LOG = logging.getLogger(__name__) + + +def check_if_process_failed(proc, timeout=1): + if proc is not None: + proc.join(timeout) + # Only abort if the process aborted + if proc.exitcode is not None and proc.exitcode > 0: + raise RuntimeError("{} exited with status {}".format(proc.name, proc.exitcode)) + + +def terminate_children(timeout=3): + current_proccess = multiprocessing.current_process() + active_children = multiprocessing.active_children() + if not active_children: + LOG.debug("no children to terminate") + return + for child in active_children: + LOG.debug("%s %s %s, child: %s %s", current_proccess.name, current_proccess.pid, + os.getpid(), child, child.pid) + LOG.debug("joining %s", child) + child.join(timeout) + child.terminate() + active_children = multiprocessing.active_children() + if not active_children: + LOG.debug("no children to terminate") + for child in active_children: + LOG.debug("%s %s %s, after terminate child: %s %s", current_proccess.name, + current_proccess.pid, os.getpid(), child, child.pid) diff --git a/yardstick/network_services/collector/subscriber.py b/yardstick/network_services/collector/subscriber.py index db52e0b45..d560e1d42 100644 --- a/yardstick/network_services/collector/subscriber.py +++ b/yardstick/network_services/collector/subscriber.py @@ -74,11 +74,12 @@ class Collector(object): # Result example: # {"VNF1: { "tput" : [1000, 999] }, "VNF2": { "latency": 100 }} LOG.debug("collect KPI for %s", node_name) - if not resource.check_if_sa_running("collectd")[0]: + if resource.check_if_sa_running("collectd")[0] != 0: continue try: results[node_name] = {"core": resource.amqp_collect_nfvi_kpi()} + LOG.debug("%s collect KPIs %s", node_name, results[node_name]['core']) except Exception: LOG.exception("") return results diff --git a/yardstick/network_services/helpers/dpdknicbind_helper.py b/yardstick/network_services/helpers/dpdkbindnic_helper.py index 605d08d38..c07613147 100644 --- a/yardstick/network_services/helpers/dpdknicbind_helper.py +++ b/yardstick/network_services/helpers/dpdkbindnic_helper.py @@ -11,9 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging + import re import itertools +import six + NETWORK_KERNEL = 'network_kernel' NETWORK_DPDK = 'network_dpdk' NETWORK_OTHER = 'network_other' @@ -22,6 +26,9 @@ CRYPTO_DPDK = 'crypto_dpdk' CRYPTO_OTHER = 'crypto_other' +LOG = logging.getLogger(__name__) + + class DpdkBindHelperException(Exception): pass @@ -123,23 +130,31 @@ class DpdkBindHelper(object): @property def interface_driver_map(self): return {interface['vpci']: interface['driver'] - for interface in itertools.chain(*self.dpdk_status.values())} + for interface in itertools.chain.from_iterable(self.dpdk_status.values())} def read_status(self): return self.parse_dpdk_status_output(self._dpdk_execute(self._status_cmd)[1]) - def bind(self, pci, driver, force=True): + def bind(self, pci_addresses, driver, force=True): + # accept single PCI or list of PCI + if isinstance(pci_addresses, six.string_types): + pci_addresses = [pci_addresses] cmd = self.DPDK_BIND_CMD.format(dpdk_nic_bind=self._dpdk_nic_bind, driver=driver, - vpci=' '.join(list(pci)), + vpci=' '.join(list(pci_addresses)), force='--force' if force else '') + LOG.debug(cmd) self._dpdk_execute(cmd) # update the inner status dict self.read_status() def save_used_drivers(self): - self.used_drivers = self.interface_driver_map + # invert the map, so we can bind by driver type + self.used_drivers = {} + # sort for stabililty + for vpci, driver in sorted(self.interface_driver_map.items()): + self.used_drivers.setdefault(driver, []).append(vpci) def rebind_drivers(self, force=True): - for vpci, driver in self.used_drivers.items(): - self.bind(vpci, driver, force) + for driver, vpcis in self.used_drivers.items(): + self.bind(vpcis, driver, force) diff --git a/yardstick/network_services/nfvi/resource.py b/yardstick/network_services/nfvi/resource.py index 7e8334c73..e3d0e3bca 100644 --- a/yardstick/network_services/nfvi/resource.py +++ b/yardstick/network_services/nfvi/resource.py @@ -19,6 +19,7 @@ from __future__ import print_function import logging from itertools import chain +import errno import jinja2 import os import os.path @@ -72,7 +73,6 @@ class ResourceProfile(object): self.timeout = timeout self.enable = True - self.cores = validate_non_string_sequence(cores, default=[]) self._queue = multiprocessing.Queue() self.amqp_client = None self.port_names = validate_non_string_sequence(port_names, default=[]) @@ -83,8 +83,16 @@ class ResourceProfile(object): def check_if_sa_running(self, process): """ verify if system agent is running """ - status, pid, _ = self.connection.execute("pgrep -f %s" % process) - return status == 0, pid + try: + err, pid, _ = self.connection.execute("pgrep -f %s" % process) + # strip whitespace + return err, pid.strip() + except OSError as e: + if e.errno in {errno.ECONNRESET}: + # if we can't connect to check, then we won't be able to connect to stop it + LOG.exception("can't connect to host to check collectd status") + return 1, None + raise def run_collectd_amqp(self): """ run amqp consumer to collect the NFVi data """ @@ -137,7 +145,7 @@ class ResourceProfile(object): def parse_intel_pmu_stats(cls, key, value): return {''.join(str(v) for v in key): value.split(":")[1]} - def parse_collectd_result(self, metrics, core_list): + def parse_collectd_result(self, metrics): """ convert collectd data into json""" result = { "cpu": {}, @@ -161,8 +169,7 @@ class ResourceProfile(object): if "cpu" in res_key0 or "intel_rdt" in res_key0: cpu_key, name, metric, testcase = \ self.get_cpu_data(res_key0, res_key1, value) - if cpu_key in core_list: - result["cpu"].setdefault(cpu_key, {}).update({name: metric}) + result["cpu"].setdefault(cpu_key, {}).update({name: metric}) elif "memory" in res_key0: result["memory"].update({res_key1: value.split(":")[0]}) @@ -189,8 +196,9 @@ class ResourceProfile(object): def amqp_process_for_nfvi_kpi(self): """ amqp collect and return nfvi kpis """ if self.amqp_client is None and self.enable: - self.amqp_client = \ - multiprocessing.Process(target=self.run_collectd_amqp) + self.amqp_client = multiprocessing.Process( + name="AmqpClient-{}-{}".format(self.mgmt['ip'], os.getpid()), + target=self.run_collectd_amqp) self.amqp_client.start() def amqp_collect_nfvi_kpi(self): @@ -201,7 +209,7 @@ class ResourceProfile(object): metric = {} while not self._queue.empty(): metric.update(self._queue.get()) - msg = self.parse_collectd_result(metric, self.cores) + msg = self.parse_collectd_result(metric) return msg def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs): @@ -265,8 +273,8 @@ class ResourceProfile(object): connection.execute("sudo rabbitmqctl authenticate_user admin admin") connection.execute("sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'") - LOG.debug("Start collectd service.....") - connection.execute("sudo %s" % collectd_path) + LOG.debug("Start collectd service..... %s second timeout", self.timeout) + connection.execute("sudo %s" % collectd_path, timeout=self.timeout) LOG.debug("Done") def initiate_systemagent(self, bin_path): @@ -292,13 +300,18 @@ class ResourceProfile(object): LOG.debug("Stop resource monitor...") if self.amqp_client is not None: + # we proper and try to join first + self.amqp_client.join(3) self.amqp_client.terminate() + LOG.debug("Check if %s is running", agent) status, pid = self.check_if_sa_running(agent) - if status == 0: + LOG.debug("status %s pid %s", status, pid) + if status != 0: return - self.connection.execute('sudo kill -9 %s' % pid) - self.connection.execute('sudo pkill -9 %s' % agent) + if pid: + self.connection.execute('sudo kill -9 "%s"' % pid) + self.connection.execute('sudo pkill -9 "%s"' % agent) self.connection.execute('sudo service rabbitmq-server stop') self.connection.execute("sudo rabbitmqctl stop_app") diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py index 67634a79c..7391633af 100644 --- a/yardstick/network_services/vnf_generic/vnf/base.py +++ b/yardstick/network_services/vnf_generic/vnf/base.py @@ -64,6 +64,8 @@ class VnfdHelper(dict): def __init__(self, *args, **kwargs): super(VnfdHelper, self).__init__(*args, **kwargs) self.port_pairs = PortPairs(self['vdu'][0]['external-interface']) + # port num is not present until binding so we have to memoize + self._port_num_map = {} @property def mgmt_interface(self): @@ -91,12 +93,14 @@ class VnfdHelper(dict): virtual_intf = interface["virtual-interface"] if virtual_intf[key] == value: return interface + raise KeyError() def find_interface(self, **kwargs): key, value = next(iter(kwargs.items())) for interface in self.interfaces: if interface[key] == value: return interface + raise KeyError() # hide dpdk_port_num key so we can abstract def find_interface_by_port(self, port): @@ -105,6 +109,7 @@ class VnfdHelper(dict): # we have to convert to int to compare if int(virtual_intf['dpdk_port_num']) == port: return interface + raise KeyError() def port_num(self, port): # we need interface name -> DPDK port num (PMD ID) -> LINK ID @@ -118,7 +123,8 @@ class VnfdHelper(dict): intf = port else: intf = self.find_interface(name=port) - return int(intf["virtual-interface"]["dpdk_port_num"]) + return self._port_num_map.setdefault(intf["name"], + int(intf["virtual-interface"]["dpdk_port_num"])) def port_nums(self, intfs): return [self.port_num(i) for i in intfs] diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py index ba4d44c41..d9fe9a948 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -364,6 +364,7 @@ class ProxSocketHelper(object): """ send data to the remote instance """ LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n')) try: + # TODO: sendall will block, we need a timeout self._sock.sendall(to_send.encode('utf-8')) except: pass @@ -593,6 +594,8 @@ _LOCAL_OBJECT = object() class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): # the actual app is lowercase APP_NAME = 'prox' + # not used for Prox but added for consistency + VNF_TYPE = "PROX" LUA_PARAMETER_NAME = "" LUA_PARAMETER_PEER = { @@ -609,6 +612,8 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): self._prox_config_data = None self.additional_files = {} self.config_queue = Queue() + # allow_exit_without_flush + self.config_queue.cancel_join_thread() self._global_section = None @property diff --git a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py index 2ac6ea412..3bfca19aa 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py @@ -16,9 +16,10 @@ import errno import logging +from yardstick.common.process import check_if_process_failed from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxDpdkVnfSetupEnvHelper from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxResourceHelper -from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, PROCESS_JOIN_TIMEOUT LOG = logging.getLogger(__name__) @@ -51,10 +52,18 @@ class ProxApproxVnf(SampleVNF): try: return self.resource_helper.execute(cmd, *args, **kwargs) except OSError as e: - if not ignore_errors or e.errno not in {errno.EPIPE, errno.ESHUTDOWN}: + if e.errno in {errno.EPIPE, errno.ESHUTDOWN, errno.ECONNRESET}: + if ignore_errors: + LOG.debug("ignoring vnf_execute exception %s for command %s", e, cmd) + else: + raise + else: raise def collect_kpi(self): + # we can't get KPIs if the VNF is down + check_if_process_failed(self._vnf_process) + if self.resource_helper is None: result = { "packets_in": 0, @@ -64,12 +73,13 @@ class ProxApproxVnf(SampleVNF): } return result - intf_count = len(self.vnfd_helper.interfaces) - if intf_count not in {1, 2, 4}: + # use all_ports so we only use ports matched in topology + port_count = len(self.vnfd_helper.port_pairs.all_ports) + if port_count not in {1, 2, 4}: raise RuntimeError("Failed ..Invalid no of ports .. " "1, 2 or 4 ports only supported at this time") - port_stats = self.vnf_execute('port_stats', range(intf_count)) + port_stats = self.vnf_execute('port_stats', range(port_count)) try: rx_total = port_stats[6] tx_total = port_stats[7] @@ -94,13 +104,20 @@ class ProxApproxVnf(SampleVNF): self.setup_helper.tear_down() def terminate(self): + # stop collectd first or we get pika errors? + self.resource_helper.stop_collect() # try to quit with socket commands - self.vnf_execute("stop_all") - self.vnf_execute("quit") - # hopefully quit succeeds and socket closes, so ignore force_quit socket errors - self.vnf_execute("force_quit", _ignore_errors=True) - if self._vnf_process: - self._vnf_process.terminate() + # pkill is not matching, debug with pgrep + self.ssh_helper.execute("sudo pgrep -lax %s" % self.setup_helper.APP_NAME) + self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.setup_helper.APP_NAME) + if self._vnf_process.is_alive(): + self.vnf_execute("stop_all") + self.vnf_execute("quit") + # hopefully quit succeeds and socket closes, so ignore force_quit socket errors + self.vnf_execute("force_quit", _ignore_errors=True) self.setup_helper.kill_vnf() self._tear_down() - self.resource_helper.stop_collect() + if self._vnf_process is not None: + LOG.debug("joining before terminate %s", self._vnf_process.name) + self._vnf_process.join(PROCESS_JOIN_TIMEOUT) + self._vnf_process.terminate() diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index bbaae39e3..114153dab 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -29,10 +29,11 @@ from six.moves import cStringIO from yardstick.benchmark.contexts.base import Context from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file +from yardstick.common.process import check_if_process_failed from yardstick.network_services.helpers.cpu import CpuSysCores from yardstick.network_services.helpers.samplevnf_helper import PortPairs from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig -from yardstick.network_services.helpers.dpdknicbind_helper import DpdkBindHelper +from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper from yardstick.network_services.nfvi.resource import ResourceProfile from yardstick.network_services.vnf_generic.vnf.base import GenericVNF from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper @@ -51,6 +52,8 @@ LOG = logging.getLogger(__name__) REMOTE_TMP = "/tmp" +DEFAULT_VNF_TIMEOUT = 3600 +PROCESS_JOIN_TIMEOUT = 3 class VnfSshHelper(AutoConnectSSH): @@ -290,8 +293,12 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): return resource def kill_vnf(self): + # pkill is not matching, debug with pgrep + self.ssh_helper.execute("sudo pgrep -lax %s" % self.APP_NAME) + self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME) # have to use exact match - self.ssh_helper.execute("sudo pkill -x %s" % self.APP_NAME) + # try using killall to match + self.ssh_helper.execute("sudo killall %s" % self.APP_NAME) def _setup_dpdk(self): """ setup dpdk environment needed for vnf to run """ @@ -330,8 +337,10 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): port_names = (intf["name"] for intf in ports) collectd_options = self.get_collectd_options() plugins = collectd_options.get("plugins", {}) + # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names, cores=cores, - plugins=plugins, interval=collectd_options.get("interval")) + plugins=plugins, interval=collectd_options.get("interval"), + timeout=self.scenario_helper.timeout) def _detect_and_bind_drivers(self): interfaces = self.vnfd_helper.interfaces @@ -386,7 +395,7 @@ class ResourceHelper(object): def _collect_resource_kpi(self): result = {} status = self.resource.check_if_sa_running("collectd")[0] - if status: + if status == 0: result = self.resource.amqp_collect_nfvi_kpi() result = {"core": result} @@ -672,12 +681,18 @@ class ScenarioHelper(object): def topology(self): return self.scenario_cfg['topology'] + @property + def timeout(self): + return self.options.get('timeout', DEFAULT_VNF_TIMEOUT) + class SampleVNF(GenericVNF): """ Class providing file-like API for generic VNF implementation """ VNF_PROMPT = "pipeline>" WAIT_TIME = 1 + APP_NAME = "SampleVNF" + # we run the VNF interactively, so the ssh command will timeout after this long def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): super(SampleVNF, self).__init__(name, vnfd) @@ -760,7 +775,8 @@ class SampleVNF(GenericVNF): def _start_vnf(self): self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT) - self._vnf_process = Process(target=self._run) + name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid()) + self._vnf_process = Process(name=name, target=self._run) self._vnf_process.start() def _vnf_up_post(self): @@ -810,6 +826,7 @@ class SampleVNF(GenericVNF): 'stdout': self.queue_wrapper, 'keep_stdin_open': True, 'pty': True, + 'timeout': self.scenario_helper.timeout, } def _build_config(self): @@ -842,11 +859,15 @@ class SampleVNF(GenericVNF): def terminate(self): self.vnf_execute("quit") - if self._vnf_process: - self._vnf_process.terminate() self.setup_helper.kill_vnf() self._tear_down() self.resource_helper.stop_collect() + if self._vnf_process is not None: + # be proper and join first before we kill + LOG.debug("joining before terminate %s", self._vnf_process.name) + self._vnf_process.join(PROCESS_JOIN_TIMEOUT) + self._vnf_process.terminate() + # no terminate children here because we share processes with tg def get_stats(self, *args, **kwargs): """ @@ -860,6 +881,8 @@ class SampleVNF(GenericVNF): return out def collect_kpi(self): + # we can't get KPIs if the VNF is down + check_if_process_failed(self._vnf_process) stats = self.get_stats() m = re.search(self.COLLECT_KPI, stats, re.MULTILINE) if m: @@ -884,7 +907,6 @@ class SampleVNFTrafficGen(GenericTrafficGen): def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): super(SampleVNFTrafficGen, self).__init__(name, vnfd) self.bin_path = get_nsb_option('bin_path', '') - self.name = "tgen__1" # name in topology file self.scenario_helper = ScenarioHelper(self.name) self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True) @@ -916,7 +938,8 @@ class SampleVNFTrafficGen(GenericTrafficGen): self.resource_helper.setup() LOG.info("Starting %s server...", self.APP_NAME) - self._tg_process = Process(target=self._start_server) + name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid()) + self._tg_process = Process(name=name, target=self._start_server) self._tg_process.start() def wait_for_instantiate(self): @@ -952,7 +975,9 @@ class SampleVNFTrafficGen(GenericTrafficGen): :param traffic_profile: :return: True/False """ - self._traffic_process = Process(target=self._traffic_runner, + name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__, + os.getpid()) + self._traffic_process = Process(name=name, target=self._traffic_runner, args=(traffic_profile,)) self._traffic_process.start() # Wait for traffic process to start @@ -983,6 +1008,9 @@ class SampleVNFTrafficGen(GenericTrafficGen): pass def collect_kpi(self): + # check if the tg processes have exited + for proc in (self._tg_process, self._traffic_process): + check_if_process_failed(proc) result = self.resource_helper.collect_kpi() LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result @@ -993,5 +1021,15 @@ class SampleVNFTrafficGen(GenericTrafficGen): :return: True/False """ self.traffic_finished = True + # we must kill client before we kill the server, or the client will raise exception if self._traffic_process is not None: + # be proper and try to join before terminating + LOG.debug("joining before terminate %s", self._traffic_process.name) + self._traffic_process.join(PROCESS_JOIN_TIMEOUT) self._traffic_process.terminate() + if self._tg_process is not None: + # be proper and try to join before terminating + LOG.debug("joining before terminate %s", self._tg_process.name) + self._tg_process.join(PROCESS_JOIN_TIMEOUT) + self._tg_process.terminate() + # no terminate children here because we share processes with vnf diff --git a/yardstick/network_services/vnf_generic/vnf/udp_replay.py b/yardstick/network_services/vnf_generic/vnf/udp_replay.py index 6b7779782..a57f53bc7 100644 --- a/yardstick/network_services/vnf_generic/vnf/udp_replay.py +++ b/yardstick/network_services/vnf_generic/vnf/udp_replay.py @@ -15,6 +15,7 @@ from __future__ import absolute_import import logging +from yardstick.common.process import check_if_process_failed from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper @@ -107,6 +108,8 @@ class UdpReplayApproxVnf(SampleVNF): def collect_kpi(self): def get_sum(offset): return sum(int(i) for i in split_stats[offset::5]) + # we can't get KPIs if the VNF is down + check_if_process_failed(self._vnf_process) number_of_ports = len(self.vnfd_helper.port_pairs.all_ports) diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py index 5f1c4d4d3..c02c0eb27 100644 --- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py @@ -24,6 +24,7 @@ import posixpath from six.moves import configparser, zip +from yardstick.common.process import check_if_process_failed from yardstick.network_services.helpers.samplevnf_helper import PortPairs from yardstick.network_services.pipeline import PipelineRules from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper @@ -278,6 +279,8 @@ class VpeApproxVnf(SampleVNF): raise NotImplementedError def collect_kpi(self): + # we can't get KPIs if the VNF is down + check_if_process_failed(self._vnf_process) result = { 'pkt_in_up_stream': 0, 'pkt_drop_up_stream': 0, |