aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick
diff options
context:
space:
mode:
authorRoss Brattain <ross.b.brattain@intel.com>2017-09-28 00:10:43 -0700
committerMaciej Skrocki <maciej.skrocki@intel.com>2017-10-03 21:54:33 +0000
commite228c2a3ac5b0173792fa7b11f9540ecec3a0029 (patch)
tree4d4b8c940c90bc5f6827475eaa45a131c6a3d246 /yardstick
parente8cf6a76c346806b53fa0c802374327ddc4956d1 (diff)
NSB PROX test hang fixes
The PROX tests were hanging in the duration runner. These are fixes for various errors: raise error in collect_kpi if VNF is down move prox dpdk_rebind after collectd stop fix dpdk nicbind rebind to group by drivers prox: raise error in collect_kpi if the VNF is down prox: add VNF_TYPE for consistency sample_vnf: debug and fix kill_vnf pkill is not matching some executable names, add some debug process dumps and try switching back to killall until we can find the issue sample_vnf: add default timeout, so we can override default 3600 SSH timeout collect_kpi is the point at which we check the VNFs and TGs for failures or exits queues are the problem make sure we aren't silently blocking on non-empty queues by canceling join thread in subprocess fixup duration runner to close queues and other attempt to stop duration runner from hanging VnfdHelper: memoize port_num resource: fail if ssh can't connect at the end of 3600 second test our ssh connection is dead, so we can't actually stop collectd unless we reconnect fix stop() logic to ignore ssh errors Change-Id: I6c8e682a80cb9d00362e2fef4a46df080f304e55 Signed-off-by: Ross Brattain <ross.b.brattain@intel.com>
Diffstat (limited to 'yardstick')
-rwxr-xr-xyardstick/benchmark/runners/arithmetic.py2
-rwxr-xr-xyardstick/benchmark/runners/base.py4
-rw-r--r--yardstick/benchmark/runners/duration.py2
-rwxr-xr-xyardstick/benchmark/runners/dynamictp.py18
-rw-r--r--yardstick/benchmark/runners/iteration.py14
-rw-r--r--yardstick/benchmark/runners/search.py13
-rw-r--r--yardstick/benchmark/runners/sequence.py10
-rw-r--r--yardstick/benchmark/scenarios/networking/vnf_generic.py24
-rw-r--r--yardstick/common/process.py47
-rw-r--r--yardstick/network_services/collector/subscriber.py3
-rw-r--r--yardstick/network_services/helpers/dpdkbindnic_helper.py (renamed from yardstick/network_services/helpers/dpdknicbind_helper.py)27
-rw-r--r--yardstick/network_services/nfvi/resource.py41
-rw-r--r--yardstick/network_services/vnf_generic/vnf/base.py8
-rw-r--r--yardstick/network_services/vnf_generic/vnf/prox_helpers.py5
-rw-r--r--yardstick/network_services/vnf_generic/vnf/prox_vnf.py41
-rw-r--r--yardstick/network_services/vnf_generic/vnf/sample_vnf.py58
-rw-r--r--yardstick/network_services/vnf_generic/vnf/udp_replay.py3
-rw-r--r--yardstick/network_services/vnf_generic/vnf/vpe_vnf.py3
18 files changed, 251 insertions, 72 deletions
diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py
index 3ff064ae1..6aaaed888 100755
--- a/yardstick/benchmark/runners/arithmetic.py
+++ b/yardstick/benchmark/runners/arithmetic.py
@@ -191,7 +191,9 @@ class ArithmeticRunner(base.Runner):
__execution_type__ = 'Arithmetic'
def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+ name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
self.process = multiprocessing.Process(
+ name=name,
target=_worker_process,
args=(self.result_queue, cls, method, scenario_cfg,
context_cfg, self.aborted, self.output_queue))
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index 3ecf67736..13718d793 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -17,13 +17,14 @@
# rally/rally/benchmark/runners/base.py
from __future__ import absolute_import
-import importlib
+
import logging
import multiprocessing
import subprocess
import time
import traceback
+import importlib
from six.moves.queue import Empty
@@ -36,7 +37,6 @@ log = logging.getLogger(__name__)
def _execute_shell_command(command):
"""execute shell script with error handling"""
exitcode = 0
- output = []
try:
output = subprocess.check_output(command, shell=True)
except Exception:
diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py
index 2cb2600c8..fbf72a74c 100644
--- a/yardstick/benchmark/runners/duration.py
+++ b/yardstick/benchmark/runners/duration.py
@@ -138,7 +138,9 @@ If the scenario ends before the time has elapsed, it will be started again.
__execution_type__ = 'Duration'
def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+ name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
self.process = multiprocessing.Process(
+ name=name,
target=_worker_process,
args=(self.result_queue, cls, method, scenario_cfg,
context_cfg, self.aborted, self.output_queue))
diff --git a/yardstick/benchmark/runners/dynamictp.py b/yardstick/benchmark/runners/dynamictp.py
index 01e76c6f4..63bfc823a 100755
--- a/yardstick/benchmark/runners/dynamictp.py
+++ b/yardstick/benchmark/runners/dynamictp.py
@@ -19,11 +19,12 @@
"""A runner that searches for the max throughput with binary search
"""
-import os
-import multiprocessing
import logging
-import traceback
+import multiprocessing
import time
+import traceback
+
+import os
from yardstick.benchmark.runners import base
@@ -65,8 +66,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
max_throuput_found = False
sequence = 0
- last_min_data = {}
- last_min_data['packets_per_second'] = 0
+ last_min_data = {'packets_per_second': 0}
while True:
sequence += 1
@@ -125,7 +125,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
queue.put(record)
max_throuput_found = True
- if (errors) or aborted.is_set() or max_throuput_found:
+ if errors or aborted.is_set() or max_throuput_found:
LOG.info("worker END")
break
@@ -155,7 +155,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
class IterationRunner(base.Runner):
- '''Run a scenario to find the max throughput
+ """Run a scenario to find the max throughput
If the scenario ends before the time has elapsed, it will be started again.
@@ -168,11 +168,13 @@ If the scenario ends before the time has elapsed, it will be started again.
type: int
unit: pps
default: 1000 pps
- '''
+ """
__execution_type__ = 'Dynamictp'
def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+ name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
self.process = multiprocessing.Process(
+ name=name,
target=_worker_process,
args=(self.result_queue, cls, method, scenario_cfg,
context_cfg, self.aborted))
diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py
index 88158eed3..cb0424377 100644
--- a/yardstick/benchmark/runners/iteration.py
+++ b/yardstick/benchmark/runners/iteration.py
@@ -20,11 +20,13 @@
"""
from __future__ import absolute_import
-import os
-import multiprocessing
+
import logging
-import traceback
+import multiprocessing
import time
+import traceback
+
+import os
from yardstick.benchmark.runners import base
@@ -88,9 +90,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
scenario_cfg['options']['rate'] -= delta
sequence = 1
continue
- except Exception as e:
+ except Exception:
errors = traceback.format_exc()
- LOG.exception(e)
+ LOG.exception("")
else:
if result:
# add timeout for put so we don't block test
@@ -151,7 +153,9 @@ If the scenario ends before the time has elapsed, it will be started again.
__execution_type__ = 'Iteration'
def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+ name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
self.process = multiprocessing.Process(
+ name=name,
target=_worker_process,
args=(self.result_queue, cls, method, scenario_cfg,
context_cfg, self.aborted, self.output_queue))
diff --git a/yardstick/benchmark/runners/search.py b/yardstick/benchmark/runners/search.py
index 5948763a7..8037329b5 100644
--- a/yardstick/benchmark/runners/search.py
+++ b/yardstick/benchmark/runners/search.py
@@ -20,15 +20,16 @@
"""
from __future__ import absolute_import
-import os
-import multiprocessing
+
import logging
-import traceback
+import multiprocessing
import time
-
-from collections import Mapping
+import traceback
from contextlib import contextmanager
from itertools import takewhile
+
+import os
+from collections import Mapping
from six.moves import zip
from yardstick.benchmark.runners import base
@@ -173,7 +174,9 @@ If the scenario ends before the time has elapsed, it will be started again.
break
def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+ name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
self.process = multiprocessing.Process(
+ name=name,
target=self._worker_run,
args=(cls, method, scenario_cfg, context_cfg))
self.process.start()
diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py
index f08ca5dde..d6e3f7109 100644
--- a/yardstick/benchmark/runners/sequence.py
+++ b/yardstick/benchmark/runners/sequence.py
@@ -21,11 +21,13 @@ The input value in the sequence is specified in a list in the input file.
"""
from __future__ import absolute_import
-import os
-import multiprocessing
+
import logging
-import traceback
+import multiprocessing
import time
+import traceback
+
+import os
from yardstick.benchmark.runners import base
@@ -140,7 +142,9 @@ class SequenceRunner(base.Runner):
__execution_type__ = 'Sequence'
def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+ name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
self.process = multiprocessing.Process(
+ name=name,
target=_worker_process,
args=(self.result_queue, cls, method, scenario_cfg,
context_cfg, self.aborted, self.output_queue))
diff --git a/yardstick/benchmark/scenarios/networking/vnf_generic.py b/yardstick/benchmark/scenarios/networking/vnf_generic.py
index d9cc0eac1..d85125230 100644
--- a/yardstick/benchmark/scenarios/networking/vnf_generic.py
+++ b/yardstick/benchmark/scenarios/networking/vnf_generic.py
@@ -30,6 +30,7 @@ from collections import defaultdict
from yardstick.benchmark.scenarios import base
from yardstick.common.constants import LOG_DIR
+from yardstick.common.process import terminate_children
from yardstick.common.utils import import_modules_from_package, itersubclasses
from yardstick.common.yaml_loader import yaml_load
from yardstick.network_services.collector.subscriber import Collector
@@ -596,7 +597,8 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
vnf.instantiate(self.scenario_cfg, self.context_cfg)
LOG.info("Waiting for %s to instantiate", vnf.name)
vnf.wait_for_instantiate()
- except RuntimeError:
+ except:
+ LOG.exception("")
for vnf in self.vnfs:
vnf.terminate()
raise
@@ -635,7 +637,19 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
:return
"""
- self.collector.stop()
- for vnf in self.vnfs:
- LOG.info("Stopping %s", vnf.name)
- vnf.terminate()
+ try:
+ try:
+ self.collector.stop()
+ for vnf in self.vnfs:
+ LOG.info("Stopping %s", vnf.name)
+ vnf.terminate()
+ LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs))
+ finally:
+ terminate_children()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise RuntimeError("Error in teardown")
diff --git a/yardstick/common/process.py b/yardstick/common/process.py
new file mode 100644
index 000000000..812ddea94
--- /dev/null
+++ b/yardstick/common/process.py
@@ -0,0 +1,47 @@
+# Copyright (c) 2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import multiprocessing
+
+import os
+
+LOG = logging.getLogger(__name__)
+
+
+def check_if_process_failed(proc, timeout=1):
+ if proc is not None:
+ proc.join(timeout)
+ # Only abort if the process aborted
+ if proc.exitcode is not None and proc.exitcode > 0:
+ raise RuntimeError("{} exited with status {}".format(proc.name, proc.exitcode))
+
+
+def terminate_children(timeout=3):
+ current_proccess = multiprocessing.current_process()
+ active_children = multiprocessing.active_children()
+ if not active_children:
+ LOG.debug("no children to terminate")
+ return
+ for child in active_children:
+ LOG.debug("%s %s %s, child: %s %s", current_proccess.name, current_proccess.pid,
+ os.getpid(), child, child.pid)
+ LOG.debug("joining %s", child)
+ child.join(timeout)
+ child.terminate()
+ active_children = multiprocessing.active_children()
+ if not active_children:
+ LOG.debug("no children to terminate")
+ for child in active_children:
+ LOG.debug("%s %s %s, after terminate child: %s %s", current_proccess.name,
+ current_proccess.pid, os.getpid(), child, child.pid)
diff --git a/yardstick/network_services/collector/subscriber.py b/yardstick/network_services/collector/subscriber.py
index db52e0b45..d560e1d42 100644
--- a/yardstick/network_services/collector/subscriber.py
+++ b/yardstick/network_services/collector/subscriber.py
@@ -74,11 +74,12 @@ class Collector(object):
# Result example:
# {"VNF1: { "tput" : [1000, 999] }, "VNF2": { "latency": 100 }}
LOG.debug("collect KPI for %s", node_name)
- if not resource.check_if_sa_running("collectd")[0]:
+ if resource.check_if_sa_running("collectd")[0] != 0:
continue
try:
results[node_name] = {"core": resource.amqp_collect_nfvi_kpi()}
+ LOG.debug("%s collect KPIs %s", node_name, results[node_name]['core'])
except Exception:
LOG.exception("")
return results
diff --git a/yardstick/network_services/helpers/dpdknicbind_helper.py b/yardstick/network_services/helpers/dpdkbindnic_helper.py
index 605d08d38..c07613147 100644
--- a/yardstick/network_services/helpers/dpdknicbind_helper.py
+++ b/yardstick/network_services/helpers/dpdkbindnic_helper.py
@@ -11,9 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+
import re
import itertools
+import six
+
NETWORK_KERNEL = 'network_kernel'
NETWORK_DPDK = 'network_dpdk'
NETWORK_OTHER = 'network_other'
@@ -22,6 +26,9 @@ CRYPTO_DPDK = 'crypto_dpdk'
CRYPTO_OTHER = 'crypto_other'
+LOG = logging.getLogger(__name__)
+
+
class DpdkBindHelperException(Exception):
pass
@@ -123,23 +130,31 @@ class DpdkBindHelper(object):
@property
def interface_driver_map(self):
return {interface['vpci']: interface['driver']
- for interface in itertools.chain(*self.dpdk_status.values())}
+ for interface in itertools.chain.from_iterable(self.dpdk_status.values())}
def read_status(self):
return self.parse_dpdk_status_output(self._dpdk_execute(self._status_cmd)[1])
- def bind(self, pci, driver, force=True):
+ def bind(self, pci_addresses, driver, force=True):
+ # accept single PCI or list of PCI
+ if isinstance(pci_addresses, six.string_types):
+ pci_addresses = [pci_addresses]
cmd = self.DPDK_BIND_CMD.format(dpdk_nic_bind=self._dpdk_nic_bind,
driver=driver,
- vpci=' '.join(list(pci)),
+ vpci=' '.join(list(pci_addresses)),
force='--force' if force else '')
+ LOG.debug(cmd)
self._dpdk_execute(cmd)
# update the inner status dict
self.read_status()
def save_used_drivers(self):
- self.used_drivers = self.interface_driver_map
+ # invert the map, so we can bind by driver type
+ self.used_drivers = {}
+ # sort for stabililty
+ for vpci, driver in sorted(self.interface_driver_map.items()):
+ self.used_drivers.setdefault(driver, []).append(vpci)
def rebind_drivers(self, force=True):
- for vpci, driver in self.used_drivers.items():
- self.bind(vpci, driver, force)
+ for driver, vpcis in self.used_drivers.items():
+ self.bind(vpcis, driver, force)
diff --git a/yardstick/network_services/nfvi/resource.py b/yardstick/network_services/nfvi/resource.py
index 7e8334c73..e3d0e3bca 100644
--- a/yardstick/network_services/nfvi/resource.py
+++ b/yardstick/network_services/nfvi/resource.py
@@ -19,6 +19,7 @@ from __future__ import print_function
import logging
from itertools import chain
+import errno
import jinja2
import os
import os.path
@@ -72,7 +73,6 @@ class ResourceProfile(object):
self.timeout = timeout
self.enable = True
- self.cores = validate_non_string_sequence(cores, default=[])
self._queue = multiprocessing.Queue()
self.amqp_client = None
self.port_names = validate_non_string_sequence(port_names, default=[])
@@ -83,8 +83,16 @@ class ResourceProfile(object):
def check_if_sa_running(self, process):
""" verify if system agent is running """
- status, pid, _ = self.connection.execute("pgrep -f %s" % process)
- return status == 0, pid
+ try:
+ err, pid, _ = self.connection.execute("pgrep -f %s" % process)
+ # strip whitespace
+ return err, pid.strip()
+ except OSError as e:
+ if e.errno in {errno.ECONNRESET}:
+ # if we can't connect to check, then we won't be able to connect to stop it
+ LOG.exception("can't connect to host to check collectd status")
+ return 1, None
+ raise
def run_collectd_amqp(self):
""" run amqp consumer to collect the NFVi data """
@@ -137,7 +145,7 @@ class ResourceProfile(object):
def parse_intel_pmu_stats(cls, key, value):
return {''.join(str(v) for v in key): value.split(":")[1]}
- def parse_collectd_result(self, metrics, core_list):
+ def parse_collectd_result(self, metrics):
""" convert collectd data into json"""
result = {
"cpu": {},
@@ -161,8 +169,7 @@ class ResourceProfile(object):
if "cpu" in res_key0 or "intel_rdt" in res_key0:
cpu_key, name, metric, testcase = \
self.get_cpu_data(res_key0, res_key1, value)
- if cpu_key in core_list:
- result["cpu"].setdefault(cpu_key, {}).update({name: metric})
+ result["cpu"].setdefault(cpu_key, {}).update({name: metric})
elif "memory" in res_key0:
result["memory"].update({res_key1: value.split(":")[0]})
@@ -189,8 +196,9 @@ class ResourceProfile(object):
def amqp_process_for_nfvi_kpi(self):
""" amqp collect and return nfvi kpis """
if self.amqp_client is None and self.enable:
- self.amqp_client = \
- multiprocessing.Process(target=self.run_collectd_amqp)
+ self.amqp_client = multiprocessing.Process(
+ name="AmqpClient-{}-{}".format(self.mgmt['ip'], os.getpid()),
+ target=self.run_collectd_amqp)
self.amqp_client.start()
def amqp_collect_nfvi_kpi(self):
@@ -201,7 +209,7 @@ class ResourceProfile(object):
metric = {}
while not self._queue.empty():
metric.update(self._queue.get())
- msg = self.parse_collectd_result(metric, self.cores)
+ msg = self.parse_collectd_result(metric)
return msg
def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs):
@@ -265,8 +273,8 @@ class ResourceProfile(object):
connection.execute("sudo rabbitmqctl authenticate_user admin admin")
connection.execute("sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'")
- LOG.debug("Start collectd service.....")
- connection.execute("sudo %s" % collectd_path)
+ LOG.debug("Start collectd service..... %s second timeout", self.timeout)
+ connection.execute("sudo %s" % collectd_path, timeout=self.timeout)
LOG.debug("Done")
def initiate_systemagent(self, bin_path):
@@ -292,13 +300,18 @@ class ResourceProfile(object):
LOG.debug("Stop resource monitor...")
if self.amqp_client is not None:
+ # we proper and try to join first
+ self.amqp_client.join(3)
self.amqp_client.terminate()
+ LOG.debug("Check if %s is running", agent)
status, pid = self.check_if_sa_running(agent)
- if status == 0:
+ LOG.debug("status %s pid %s", status, pid)
+ if status != 0:
return
- self.connection.execute('sudo kill -9 %s' % pid)
- self.connection.execute('sudo pkill -9 %s' % agent)
+ if pid:
+ self.connection.execute('sudo kill -9 "%s"' % pid)
+ self.connection.execute('sudo pkill -9 "%s"' % agent)
self.connection.execute('sudo service rabbitmq-server stop')
self.connection.execute("sudo rabbitmqctl stop_app")
diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py
index 67634a79c..7391633af 100644
--- a/yardstick/network_services/vnf_generic/vnf/base.py
+++ b/yardstick/network_services/vnf_generic/vnf/base.py
@@ -64,6 +64,8 @@ class VnfdHelper(dict):
def __init__(self, *args, **kwargs):
super(VnfdHelper, self).__init__(*args, **kwargs)
self.port_pairs = PortPairs(self['vdu'][0]['external-interface'])
+ # port num is not present until binding so we have to memoize
+ self._port_num_map = {}
@property
def mgmt_interface(self):
@@ -91,12 +93,14 @@ class VnfdHelper(dict):
virtual_intf = interface["virtual-interface"]
if virtual_intf[key] == value:
return interface
+ raise KeyError()
def find_interface(self, **kwargs):
key, value = next(iter(kwargs.items()))
for interface in self.interfaces:
if interface[key] == value:
return interface
+ raise KeyError()
# hide dpdk_port_num key so we can abstract
def find_interface_by_port(self, port):
@@ -105,6 +109,7 @@ class VnfdHelper(dict):
# we have to convert to int to compare
if int(virtual_intf['dpdk_port_num']) == port:
return interface
+ raise KeyError()
def port_num(self, port):
# we need interface name -> DPDK port num (PMD ID) -> LINK ID
@@ -118,7 +123,8 @@ class VnfdHelper(dict):
intf = port
else:
intf = self.find_interface(name=port)
- return int(intf["virtual-interface"]["dpdk_port_num"])
+ return self._port_num_map.setdefault(intf["name"],
+ int(intf["virtual-interface"]["dpdk_port_num"]))
def port_nums(self, intfs):
return [self.port_num(i) for i in intfs]
diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
index ba4d44c41..d9fe9a948 100644
--- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
+++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
@@ -364,6 +364,7 @@ class ProxSocketHelper(object):
""" send data to the remote instance """
LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
try:
+ # TODO: sendall will block, we need a timeout
self._sock.sendall(to_send.encode('utf-8'))
except:
pass
@@ -593,6 +594,8 @@ _LOCAL_OBJECT = object()
class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
# the actual app is lowercase
APP_NAME = 'prox'
+ # not used for Prox but added for consistency
+ VNF_TYPE = "PROX"
LUA_PARAMETER_NAME = ""
LUA_PARAMETER_PEER = {
@@ -609,6 +612,8 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
self._prox_config_data = None
self.additional_files = {}
self.config_queue = Queue()
+ # allow_exit_without_flush
+ self.config_queue.cancel_join_thread()
self._global_section = None
@property
diff --git a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py
index 2ac6ea412..3bfca19aa 100644
--- a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py
+++ b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py
@@ -16,9 +16,10 @@ import errno
import logging
+from yardstick.common.process import check_if_process_failed
from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxDpdkVnfSetupEnvHelper
from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxResourceHelper
-from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, PROCESS_JOIN_TIMEOUT
LOG = logging.getLogger(__name__)
@@ -51,10 +52,18 @@ class ProxApproxVnf(SampleVNF):
try:
return self.resource_helper.execute(cmd, *args, **kwargs)
except OSError as e:
- if not ignore_errors or e.errno not in {errno.EPIPE, errno.ESHUTDOWN}:
+ if e.errno in {errno.EPIPE, errno.ESHUTDOWN, errno.ECONNRESET}:
+ if ignore_errors:
+ LOG.debug("ignoring vnf_execute exception %s for command %s", e, cmd)
+ else:
+ raise
+ else:
raise
def collect_kpi(self):
+ # we can't get KPIs if the VNF is down
+ check_if_process_failed(self._vnf_process)
+
if self.resource_helper is None:
result = {
"packets_in": 0,
@@ -64,12 +73,13 @@ class ProxApproxVnf(SampleVNF):
}
return result
- intf_count = len(self.vnfd_helper.interfaces)
- if intf_count not in {1, 2, 4}:
+ # use all_ports so we only use ports matched in topology
+ port_count = len(self.vnfd_helper.port_pairs.all_ports)
+ if port_count not in {1, 2, 4}:
raise RuntimeError("Failed ..Invalid no of ports .. "
"1, 2 or 4 ports only supported at this time")
- port_stats = self.vnf_execute('port_stats', range(intf_count))
+ port_stats = self.vnf_execute('port_stats', range(port_count))
try:
rx_total = port_stats[6]
tx_total = port_stats[7]
@@ -94,13 +104,20 @@ class ProxApproxVnf(SampleVNF):
self.setup_helper.tear_down()
def terminate(self):
+ # stop collectd first or we get pika errors?
+ self.resource_helper.stop_collect()
# try to quit with socket commands
- self.vnf_execute("stop_all")
- self.vnf_execute("quit")
- # hopefully quit succeeds and socket closes, so ignore force_quit socket errors
- self.vnf_execute("force_quit", _ignore_errors=True)
- if self._vnf_process:
- self._vnf_process.terminate()
+ # pkill is not matching, debug with pgrep
+ self.ssh_helper.execute("sudo pgrep -lax %s" % self.setup_helper.APP_NAME)
+ self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.setup_helper.APP_NAME)
+ if self._vnf_process.is_alive():
+ self.vnf_execute("stop_all")
+ self.vnf_execute("quit")
+ # hopefully quit succeeds and socket closes, so ignore force_quit socket errors
+ self.vnf_execute("force_quit", _ignore_errors=True)
self.setup_helper.kill_vnf()
self._tear_down()
- self.resource_helper.stop_collect()
+ if self._vnf_process is not None:
+ LOG.debug("joining before terminate %s", self._vnf_process.name)
+ self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
+ self._vnf_process.terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
index bbaae39e3..114153dab 100644
--- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
+++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
@@ -29,10 +29,11 @@ from six.moves import cStringIO
from yardstick.benchmark.contexts.base import Context
from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
+from yardstick.common.process import check_if_process_failed
from yardstick.network_services.helpers.cpu import CpuSysCores
from yardstick.network_services.helpers.samplevnf_helper import PortPairs
from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
-from yardstick.network_services.helpers.dpdknicbind_helper import DpdkBindHelper
+from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
from yardstick.network_services.nfvi.resource import ResourceProfile
from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
@@ -51,6 +52,8 @@ LOG = logging.getLogger(__name__)
REMOTE_TMP = "/tmp"
+DEFAULT_VNF_TIMEOUT = 3600
+PROCESS_JOIN_TIMEOUT = 3
class VnfSshHelper(AutoConnectSSH):
@@ -290,8 +293,12 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
return resource
def kill_vnf(self):
+ # pkill is not matching, debug with pgrep
+ self.ssh_helper.execute("sudo pgrep -lax %s" % self.APP_NAME)
+ self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
# have to use exact match
- self.ssh_helper.execute("sudo pkill -x %s" % self.APP_NAME)
+ # try using killall to match
+ self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
def _setup_dpdk(self):
""" setup dpdk environment needed for vnf to run """
@@ -330,8 +337,10 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
port_names = (intf["name"] for intf in ports)
collectd_options = self.get_collectd_options()
plugins = collectd_options.get("plugins", {})
+ # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names, cores=cores,
- plugins=plugins, interval=collectd_options.get("interval"))
+ plugins=plugins, interval=collectd_options.get("interval"),
+ timeout=self.scenario_helper.timeout)
def _detect_and_bind_drivers(self):
interfaces = self.vnfd_helper.interfaces
@@ -386,7 +395,7 @@ class ResourceHelper(object):
def _collect_resource_kpi(self):
result = {}
status = self.resource.check_if_sa_running("collectd")[0]
- if status:
+ if status == 0:
result = self.resource.amqp_collect_nfvi_kpi()
result = {"core": result}
@@ -672,12 +681,18 @@ class ScenarioHelper(object):
def topology(self):
return self.scenario_cfg['topology']
+ @property
+ def timeout(self):
+ return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
+
class SampleVNF(GenericVNF):
""" Class providing file-like API for generic VNF implementation """
VNF_PROMPT = "pipeline>"
WAIT_TIME = 1
+ APP_NAME = "SampleVNF"
+ # we run the VNF interactively, so the ssh command will timeout after this long
def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
super(SampleVNF, self).__init__(name, vnfd)
@@ -760,7 +775,8 @@ class SampleVNF(GenericVNF):
def _start_vnf(self):
self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
- self._vnf_process = Process(target=self._run)
+ name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
+ self._vnf_process = Process(name=name, target=self._run)
self._vnf_process.start()
def _vnf_up_post(self):
@@ -810,6 +826,7 @@ class SampleVNF(GenericVNF):
'stdout': self.queue_wrapper,
'keep_stdin_open': True,
'pty': True,
+ 'timeout': self.scenario_helper.timeout,
}
def _build_config(self):
@@ -842,11 +859,15 @@ class SampleVNF(GenericVNF):
def terminate(self):
self.vnf_execute("quit")
- if self._vnf_process:
- self._vnf_process.terminate()
self.setup_helper.kill_vnf()
self._tear_down()
self.resource_helper.stop_collect()
+ if self._vnf_process is not None:
+ # be proper and join first before we kill
+ LOG.debug("joining before terminate %s", self._vnf_process.name)
+ self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
+ self._vnf_process.terminate()
+ # no terminate children here because we share processes with tg
def get_stats(self, *args, **kwargs):
"""
@@ -860,6 +881,8 @@ class SampleVNF(GenericVNF):
return out
def collect_kpi(self):
+ # we can't get KPIs if the VNF is down
+ check_if_process_failed(self._vnf_process)
stats = self.get_stats()
m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
if m:
@@ -884,7 +907,6 @@ class SampleVNFTrafficGen(GenericTrafficGen):
def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
super(SampleVNFTrafficGen, self).__init__(name, vnfd)
self.bin_path = get_nsb_option('bin_path', '')
- self.name = "tgen__1" # name in topology file
self.scenario_helper = ScenarioHelper(self.name)
self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
@@ -916,7 +938,8 @@ class SampleVNFTrafficGen(GenericTrafficGen):
self.resource_helper.setup()
LOG.info("Starting %s server...", self.APP_NAME)
- self._tg_process = Process(target=self._start_server)
+ name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
+ self._tg_process = Process(name=name, target=self._start_server)
self._tg_process.start()
def wait_for_instantiate(self):
@@ -952,7 +975,9 @@ class SampleVNFTrafficGen(GenericTrafficGen):
:param traffic_profile:
:return: True/False
"""
- self._traffic_process = Process(target=self._traffic_runner,
+ name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
+ os.getpid())
+ self._traffic_process = Process(name=name, target=self._traffic_runner,
args=(traffic_profile,))
self._traffic_process.start()
# Wait for traffic process to start
@@ -983,6 +1008,9 @@ class SampleVNFTrafficGen(GenericTrafficGen):
pass
def collect_kpi(self):
+ # check if the tg processes have exited
+ for proc in (self._tg_process, self._traffic_process):
+ check_if_process_failed(proc)
result = self.resource_helper.collect_kpi()
LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
return result
@@ -993,5 +1021,15 @@ class SampleVNFTrafficGen(GenericTrafficGen):
:return: True/False
"""
self.traffic_finished = True
+ # we must kill client before we kill the server, or the client will raise exception
if self._traffic_process is not None:
+ # be proper and try to join before terminating
+ LOG.debug("joining before terminate %s", self._traffic_process.name)
+ self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
self._traffic_process.terminate()
+ if self._tg_process is not None:
+ # be proper and try to join before terminating
+ LOG.debug("joining before terminate %s", self._tg_process.name)
+ self._tg_process.join(PROCESS_JOIN_TIMEOUT)
+ self._tg_process.terminate()
+ # no terminate children here because we share processes with vnf
diff --git a/yardstick/network_services/vnf_generic/vnf/udp_replay.py b/yardstick/network_services/vnf_generic/vnf/udp_replay.py
index 6b7779782..a57f53bc7 100644
--- a/yardstick/network_services/vnf_generic/vnf/udp_replay.py
+++ b/yardstick/network_services/vnf_generic/vnf/udp_replay.py
@@ -15,6 +15,7 @@
from __future__ import absolute_import
import logging
+from yardstick.common.process import check_if_process_failed
from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF
from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
@@ -107,6 +108,8 @@ class UdpReplayApproxVnf(SampleVNF):
def collect_kpi(self):
def get_sum(offset):
return sum(int(i) for i in split_stats[offset::5])
+ # we can't get KPIs if the VNF is down
+ check_if_process_failed(self._vnf_process)
number_of_ports = len(self.vnfd_helper.port_pairs.all_ports)
diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
index 5f1c4d4d3..c02c0eb27 100644
--- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
+++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
@@ -24,6 +24,7 @@ import posixpath
from six.moves import configparser, zip
+from yardstick.common.process import check_if_process_failed
from yardstick.network_services.helpers.samplevnf_helper import PortPairs
from yardstick.network_services.pipeline import PipelineRules
from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper
@@ -278,6 +279,8 @@ class VpeApproxVnf(SampleVNF):
raise NotImplementedError
def collect_kpi(self):
+ # we can't get KPIs if the VNF is down
+ check_if_process_failed(self._vnf_process)
result = {
'pkt_in_up_stream': 0,
'pkt_drop_up_stream': 0,