diff options
author | Ross Brattain <ross.b.brattain@intel.com> | 2017-09-28 00:10:43 -0700 |
---|---|---|
committer | Ross Brattain <ross.b.brattain@intel.com> | 2017-10-04 08:31:17 -0700 |
commit | 8e35dc2c45ec1897690173b374406a05b2132954 (patch) | |
tree | e21fbaf51dd5d3aafe801798f5efa3e83be4b50d | |
parent | ddff9781c46fc446380a378cbca40d9576e32b68 (diff) |
NSB PROX test hang fixes
The PROX tests were hanging in the duration
runner.
These are fixes for various errors:
raise error in collect_kpi if VNF is down
move prox dpdk_rebind after collectd stop
fix dpdk nicbind rebind to group by drivers
prox: raise error in collect_kpi if the VNF is down
prox: add VNF_TYPE for consistency
sample_vnf: debug and fix kill_vnf
pkill is not matching some executable names,
add some debug process dumps and try switching
back to killall until we can find the issue
sample_vnf: add default timeout, so we can override
default 3600 SSH timeout
collect_kpi is the point at which we check
the VNFs and TGs for failures or exits
queues are the problem make sure we aren't silently blocking on
non-empty queues by canceling join thread in subprocess
fixup duration runner to close queues
and other attempt to stop duration runner
from hanging
VnfdHelper: memoize port_num
resource: fail if ssh can't connect
at the end of 3600 second test our ssh connection
is dead, so we can't actually stop collectd
unless we reconnect
fix stop() logic to ignore ssh errors
Change-Id: I6c8e682a80cb9d00362e2fef4a46df080f304e55
Signed-off-by: Ross Brattain <ross.b.brattain@intel.com>
30 files changed, 440 insertions, 130 deletions
diff --git a/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py b/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py index 5b15daca4..016608a21 100644 --- a/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py +++ b/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py @@ -445,6 +445,13 @@ class TestNetworkServiceTestCase(unittest.TestCase): self.assertIsNotNone( self.s.load_vnf_models(self.scenario_cfg, self.context_cfg)) + def test_load_vnf_models_no_model(self): + vnf = mock.Mock(autospec=GenericVNF) + self.s.get_vnf_impl = mock.Mock(return_value=vnf) + + self.assertIsNotNone( + self.s.load_vnf_models(self.scenario_cfg, self.context_cfg)) + def test_map_topology_to_infrastructure(self): with mock.patch("yardstick.ssh.SSH") as ssh: ssh_mock = mock.Mock(autospec=ssh.SSH) @@ -568,6 +575,35 @@ class TestNetworkServiceTestCase(unittest.TestCase): mock.Mock(return_value=TRAFFIC_PROFILE) self.assertEqual(None, self.s.setup()) + def test_setup_exception(self): + with mock.patch("yardstick.ssh.SSH") as ssh: + ssh_mock = mock.Mock(autospec=ssh.SSH) + ssh_mock.execute = \ + mock.Mock(return_value=(0, SYS_CLASS_NET + IP_ADDR_SHOW, "")) + ssh.from_node.return_value = ssh_mock + + tgen = mock.Mock(autospec=GenericTrafficGen) + tgen.traffic_finished = True + verified_dict = {"verified": True} + tgen.verify_traffic = lambda x: verified_dict + tgen.terminate = mock.Mock(return_value=True) + tgen.name = "tgen__1" + vnf = mock.Mock(autospec=GenericVNF) + vnf.runs_traffic = False + vnf.instantiate.side_effect = RuntimeError("error during instantiate") + vnf.terminate = mock.Mock(return_value=True) + self.s.vnfs = [tgen, vnf] + self.s.traffic_profile = mock.Mock() + self.s.collector = mock.Mock(autospec=Collector) + self.s.collector.get_kpi = \ + mock.Mock(return_value={tgen.name: verified_dict}) + self.s.map_topology_to_infrastructure = mock.Mock(return_value=0) + self.s.load_vnf_models = mock.Mock(return_value=self.s.vnfs) + self.s._fill_traffic_profile = \ + mock.Mock(return_value=TRAFFIC_PROFILE) + with self.assertRaises(RuntimeError): + self.s.setup() + def test__get_traffic_profile(self): self.scenario_cfg["traffic_profile"] = \ self._get_file_abspath("ipv4_throughput_vpe.yaml") @@ -594,8 +630,8 @@ class TestNetworkServiceTestCase(unittest.TestCase): def test_teardown(self): vnf = mock.Mock(autospec=GenericVNF) - vnf.terminate = \ - mock.Mock(return_value=True) + vnf.terminate = mock.Mock(return_value=True) + vnf.name = str(vnf) self.s.vnfs = [vnf] self.s.traffic_profile = mock.Mock() self.s.collector = mock.Mock(autospec=Collector) @@ -603,6 +639,18 @@ class TestNetworkServiceTestCase(unittest.TestCase): mock.Mock(return_value=True) self.assertIsNone(self.s.teardown()) + def test_teardown_exception(self): + vnf = mock.Mock(autospec=GenericVNF) + vnf.terminate = mock.Mock(side_effect=RuntimeError("error duing terminate")) + vnf.name = str(vnf) + self.s.vnfs = [vnf] + self.s.traffic_profile = mock.Mock() + self.s.collector = mock.Mock(autospec=Collector) + self.s.collector.stop = \ + mock.Mock(return_value=True) + with self.assertRaises(RuntimeError): + self.s.teardown() + SAMPLE_NETDEVS = { 'enp11s0': { 'address': '0a:de:ad:be:ef:f5', diff --git a/tests/unit/common/test_process.py b/tests/unit/common/test_process.py new file mode 100644 index 000000000..5eee55bcc --- /dev/null +++ b/tests/unit/common/test_process.py @@ -0,0 +1,46 @@ +# Copyright (c) 2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest + +import mock + +from yardstick.common import process + + +class ProcessTestcase(unittest.TestCase): + def test_check_if_procces_failed_None(self): + p = mock.MagicMock(**{"exitcode": None, "name": "debug"}) + process.check_if_process_failed(p) + + def test_check_if_procces_failed_0(self): + p = mock.MagicMock(**{"exitcode": 0, "name": "debug"}) + process.check_if_process_failed(p) + + def test_check_if_procces_failed_1(self): + p = mock.MagicMock(**{"exitcode": 1, "name": "debug"}) + with self.assertRaises(RuntimeError): + process.check_if_process_failed(p) + + +@mock.patch("yardstick.common.process.multiprocessing") +class TerminateChildrenTestcase(unittest.TestCase): + def test_some_children(self, mock_multiprocessing): + p1 = mock.MagicMock() + p2 = mock.MagicMock() + mock_multiprocessing.active_children.return_value = [p1, p2] + process.terminate_children() + + def test_no_children(self, mock_multiprocessing): + mock_multiprocessing.active_children.return_value = [] + process.terminate_children() diff --git a/tests/unit/network_services/collector/test_subscriber.py b/tests/unit/network_services/collector/test_subscriber.py index 260f0bb27..f324f627d 100644 --- a/tests/unit/network_services/collector/test_subscriber.py +++ b/tests/unit/network_services/collector/test_subscriber.py @@ -45,6 +45,7 @@ class CollectorTestCase(unittest.TestCase): NODES = { 'node1': {}, 'node2': { + 'ip': '1.2.3.4', 'collectd': { 'plugins': {'abc': 12, 'def': 34}, 'interval': 987, diff --git a/tests/unit/network_services/helpers/test_dpdkbindnic_helper.py b/tests/unit/network_services/helpers/test_dpdkbindnic_helper.py index dbd8396c8..0f1cf7d92 100644 --- a/tests/unit/network_services/helpers/test_dpdkbindnic_helper.py +++ b/tests/unit/network_services/helpers/test_dpdkbindnic_helper.py @@ -16,19 +16,19 @@ import mock import unittest -from yardstick.network_services.helpers.dpdknicbind_helper import DpdkBindHelper -from yardstick.network_services.helpers.dpdknicbind_helper import DpdkBindHelperException -from yardstick.network_services.helpers.dpdknicbind_helper import NETWORK_KERNEL -from yardstick.network_services.helpers.dpdknicbind_helper import NETWORK_DPDK -from yardstick.network_services.helpers.dpdknicbind_helper import CRYPTO_KERNEL -from yardstick.network_services.helpers.dpdknicbind_helper import CRYPTO_DPDK -from yardstick.network_services.helpers.dpdknicbind_helper import NETWORK_OTHER -from yardstick.network_services.helpers.dpdknicbind_helper import CRYPTO_OTHER +from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper +from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelperException +from yardstick.network_services.helpers.dpdkbindnic_helper import NETWORK_KERNEL +from yardstick.network_services.helpers.dpdkbindnic_helper import NETWORK_DPDK +from yardstick.network_services.helpers.dpdkbindnic_helper import CRYPTO_KERNEL +from yardstick.network_services.helpers.dpdkbindnic_helper import CRYPTO_DPDK +from yardstick.network_services.helpers.dpdkbindnic_helper import NETWORK_OTHER +from yardstick.network_services.helpers.dpdkbindnic_helper import CRYPTO_OTHER pass -class MyTestDpdkBindHelper(unittest.TestCase): +class TestDpdkBindHelper(unittest.TestCase): EXAMPLE_OUTPUT = """ Network devices using DPDK-compatible driver @@ -204,17 +204,31 @@ Other crypto devices def test_bind(self): conn = mock.Mock() conn.execute = mock.Mock(return_value=(0, '', '')) - conn.provision_tool = mock.Mock(return_value='/opt/nsb_bin/dpdk_nic_bind.py') + conn.provision_tool = mock.Mock(return_value='/opt/nsb_bin/dpdk-devbind.py') dpdk_bind_helper = DpdkBindHelper(conn) dpdk_bind_helper.read_status = mock.Mock() dpdk_bind_helper.bind(['0000:00:03.0', '0000:00:04.0'], 'my_driver') - conn.execute.assert_called_with('sudo /opt/nsb_bin/dpdk_nic_bind.py --force ' + conn.execute.assert_called_with('sudo /opt/nsb_bin/dpdk-devbind.py --force ' '-b my_driver 0000:00:03.0 0000:00:04.0') dpdk_bind_helper.read_status.assert_called_once() + def test_bind_single_pci(self): + conn = mock.Mock() + conn.execute = mock.Mock(return_value=(0, '', '')) + conn.provision_tool = mock.Mock(return_value='/opt/nsb_bin/dpdk-devbind.py') + + dpdk_bind_helper = DpdkBindHelper(conn) + dpdk_bind_helper.read_status = mock.Mock() + + dpdk_bind_helper.bind('0000:00:03.0', 'my_driver') + + conn.execute.assert_called_with('sudo /opt/nsb_bin/dpdk-devbind.py --force ' + '-b my_driver 0000:00:03.0') + dpdk_bind_helper.read_status.assert_called_once() + def test_rebind_drivers(self): conn = mock.Mock() @@ -222,14 +236,14 @@ Other crypto devices dpdk_bind_helper.bind = mock.Mock() dpdk_bind_helper.used_drivers = { - '0000:05:00.0': 'd1', - '0000:05:01.0': 'd3', + 'd1': ['0000:05:00.0'], + 'd3': ['0000:05:01.0', '0000:05:02.0'], } dpdk_bind_helper.rebind_drivers() - dpdk_bind_helper.bind.assert_any_call('0000:05:00.0', 'd1', True) - dpdk_bind_helper.bind.assert_any_call('0000:05:01.0', 'd3', True) + dpdk_bind_helper.bind.assert_any_call(['0000:05:00.0'], 'd1', True) + dpdk_bind_helper.bind.assert_any_call(['0000:05:01.0', '0000:05:02.0'], 'd3', True) def test_save_used_drivers(self): conn = mock.Mock() @@ -239,9 +253,8 @@ Other crypto devices dpdk_bind_helper.save_used_drivers() expected = { - '0000:00:04.0': 'igb_uio', - '0000:00:05.0': 'igb_uio', - '0000:00:03.0': 'virtio-pci', + 'igb_uio': ['0000:00:04.0', '0000:00:05.0'], + 'virtio-pci': ['0000:00:03.0'], } - self.assertEqual(expected, dpdk_bind_helper.used_drivers) + self.assertDictEqual(expected, dpdk_bind_helper.used_drivers) diff --git a/tests/unit/network_services/nfvi/test_resource.py b/tests/unit/network_services/nfvi/test_resource.py index 4fc6d7773..799cc202b 100644 --- a/tests/unit/network_services/nfvi/test_resource.py +++ b/tests/unit/network_services/nfvi/test_resource.py @@ -14,6 +14,8 @@ from __future__ import absolute_import import unittest + +import errno import mock from yardstick.network_services.nfvi.resource import ResourceProfile @@ -88,7 +90,7 @@ class TestResourceProfile(unittest.TestCase): with mock.patch("yardstick.ssh.AutoConnectSSH") as ssh: self.ssh_mock = mock.Mock(autospec=ssh.SSH) self.ssh_mock.execute = \ - mock.Mock(return_value=(0, {}, "")) + mock.Mock(return_value=(0, "", "")) ssh.from_node.return_value = self.ssh_mock mgmt = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['mgmt-interface'] @@ -105,7 +107,12 @@ class TestResourceProfile(unittest.TestCase): def test_check_if_sa_running(self): self.assertEqual(self.resource_profile.check_if_sa_running("collectd"), - (True, {})) + (0, "")) + + def test_check_if_sa_running_excetion(self): + with mock.patch.object(self.resource_profile.connection, "execute") as mock_execute: + mock_execute.side_effect = OSError(errno.ECONNRESET, "error") + self.assertEqual(self.resource_profile.check_if_sa_running("collectd"), (1, None)) def test_get_cpu_data(self): reskey = ["", "cpufreq", "cpufreq-0"] @@ -127,7 +134,6 @@ class TestResourceProfile(unittest.TestCase): self.assertIsNone( self.resource_profile._prepare_collectd_conf("/opt/nsb_bin")) - @mock.patch("yardstick.network_services.nfvi.resource.open") @mock.patch("yardstick.network_services.nfvi.resource.os") def test__provide_config_file(self, mock_open, mock_os): @@ -141,13 +147,18 @@ class TestResourceProfile(unittest.TestCase): self.resource_profile._provide_config_file("/opt/nsb_bin", "collectd.conf", kwargs) self.ssh_mock.execute.assert_called_once() - @mock.patch("yardstick.network_services.nfvi.resource.open") def test_initiate_systemagent(self, mock_open): self.resource_profile._start_collectd = mock.Mock() self.assertIsNone( self.resource_profile.initiate_systemagent("/opt/nsb_bin")) + @mock.patch("yardstick.network_services.nfvi.resource.open") + def test_initiate_systemagent_raise(self, mock_open): + self.resource_profile._start_collectd = mock.Mock(side_effect=RuntimeError) + with self.assertRaises(RuntimeError): + self.resource_profile.initiate_systemagent("/opt/nsb_bin") + def test__parse_hugepages(self): reskey = ["cpu", "cpuFreq"] value = "timestamp:12345" @@ -173,7 +184,7 @@ class TestResourceProfile(unittest.TestCase): self.assertEqual({'ovs/stats': '45'}, res) def test_parse_collectd_result(self): - res = self.resource_profile.parse_collectd_result({}, [0, 1, 2]) + res = self.resource_profile.parse_collectd_result({}) expected_result = {'cpu': {}, 'dpdkstat': {}, 'hugepages': {}, 'memory': {}, 'ovs_stats': {}, 'timestamp': '', 'intel_pmu': {}, @@ -186,7 +197,7 @@ class TestResourceProfile(unittest.TestCase): "ipc", "1234", ""]) - res = self.resource_profile.parse_collectd_result(metric, [0, 1, 2]) + res = self.resource_profile.parse_collectd_result(metric) expected_result = {'cpu': {1: {'ipc': '1234'}}, 'dpdkstat': {}, 'hugepages': {}, 'memory': {}, 'ovs_stats': {}, 'timestamp': '', 'intel_pmu': {}, @@ -195,7 +206,7 @@ class TestResourceProfile(unittest.TestCase): def test_parse_collectd_result_memory(self): metric = {"nsb_stats/memory/bw": "101"} - res = self.resource_profile.parse_collectd_result(metric, [0, 1, 2]) + res = self.resource_profile.parse_collectd_result(metric) expected_result = {'cpu': {}, 'dpdkstat': {}, 'hugepages': {}, 'memory': {'bw': '101'}, 'ovs_stats': {}, 'timestamp': '', 'intel_pmu': {}, @@ -205,9 +216,8 @@ class TestResourceProfile(unittest.TestCase): def test_parse_collectd_result_hugepage(self): # amqp returns bytes metric = {b"nsb_stats/hugepages/free": b"101"} - self.resource_profile.parse_hugepages = \ - mock.Mock(return_value={"free": "101"}) - res = self.resource_profile.parse_collectd_result(metric, [0, 1, 2]) + self.resource_profile.parse_hugepages = mock.Mock(return_value={"free": "101"}) + res = self.resource_profile.parse_collectd_result(metric) expected_result = {'cpu': {}, 'dpdkstat': {}, 'hugepages': {'free': '101'}, 'memory': {}, 'ovs_stats': {}, 'timestamp': '', 'intel_pmu': {}, @@ -224,7 +234,7 @@ class TestResourceProfile(unittest.TestCase): mock.Mock(return_value={"memory": "101"}) self.resource_profile.parse_ovs_stats = \ mock.Mock(return_value={"tx": "101"}) - res = self.resource_profile.parse_collectd_result(metric, [0, 1, 2]) + res = self.resource_profile.parse_collectd_result(metric) expected_result = {'cpu': {}, 'dpdkstat': {'tx': '101'}, 'hugepages': {}, 'memory': {}, 'ovs_stats': {'tx': '101'}, 'timestamp': '', 'intel_pmu': {}, @@ -258,5 +268,9 @@ class TestResourceProfile(unittest.TestCase): def test_stop(self): self.assertIsNone(self.resource_profile.stop()) + def test_stop(self): + self.resource_profile.amqp_client = mock.MagicMock() + self.assertIsNone(self.resource_profile.stop()) + if __name__ == '__main__': unittest.main() diff --git a/tests/unit/network_services/vnf_generic/vnf/test_cgnapt_vnf.py b/tests/unit/network_services/vnf_generic/vnf/test_cgnapt_vnf.py index 0a4c12446..b0ef1da91 100644 --- a/tests/unit/network_services/vnf_generic/vnf/test_cgnapt_vnf.py +++ b/tests/unit/network_services/vnf_generic/vnf/test_cgnapt_vnf.py @@ -41,7 +41,7 @@ TEST_FILE_YAML = 'nsb_test_case.yaml' SSH_HELPER = 'yardstick.network_services.vnf_generic.vnf.sample_vnf.VnfSshHelper' -name = 'vnf__1' +name = 'vnf__0' class TestCgnaptApproxSetupEnvHelper(unittest.TestCase): @@ -162,7 +162,7 @@ class TestCgnaptApproxVnf(unittest.TestCase): 'rfc2544': { 'allowed_drop_rate': '0.8 - 1', }, - 'vnf__1': { + 'vnf__0': { 'napt': 'dynamic', 'vnf_config': { 'lb_config': 'SW', @@ -172,6 +172,10 @@ class TestCgnaptApproxVnf(unittest.TestCase): 'worker_threads': 1, }, }, + 'flow': {'count': 1, + 'dst_ip': [{'tg__1': 'xe0'}], + 'public_ip': [''], + 'src_ip': [{'tg__0': 'xe0'}]}, }, 'task_id': 'a70bdf4a-8e67-47a3-9dc1-273c14506eb7', 'task_path': '/tmp', @@ -185,15 +189,11 @@ class TestCgnaptApproxVnf(unittest.TestCase): 'type': 'Duration', }, 'traffic_profile': 'ipv4_throughput_acl.yaml', - 'traffic_options': { - 'flow': 'ipv4_Packets_acl.yaml', - 'imix': 'imix_voice.yaml', - }, - 'type': 'ISB', + 'type': 'NSPerf', 'nodes': { - 'tg__2': 'trafficgen_2.yardstick', 'tg__1': 'trafficgen_1.yardstick', - 'vnf__1': 'vnf.yardstick', + 'tg__0': 'trafficgen_0.yardstick', + 'vnf__0': 'vnf.yardstick', }, 'topology': 'vpe-tg-topology-baremetal.yaml', } @@ -253,9 +253,9 @@ class TestCgnaptApproxVnf(unittest.TestCase): 'password': 'r00t', 'VNF model': 'tg_rfc2544_tpl.yaml', 'user': 'root'}, - 'vnf__1': + 'vnf__0': {'name': 'vnf.yardstick', - 'vnfd-id-ref': 'vnf__1', + 'vnfd-id-ref': 'vnf__0', 'ip': '1.2.1.1', 'interfaces': {'xe0': {'local_iface_name': 'ens786f0', @@ -318,6 +318,8 @@ class TestCgnaptApproxVnf(unittest.TestCase): vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0] cgnapt_approx_vnf = CgnaptApproxVnf(name, vnfd) + cgnapt_approx_vnf._vnf_process = mock.MagicMock( + **{"is_alive.return_value": True, "exitcode": None}) cgnapt_approx_vnf.q_in = mock.MagicMock() cgnapt_approx_vnf.q_out = mock.MagicMock() cgnapt_approx_vnf.q_out.qsize = mock.Mock(return_value=0) @@ -388,7 +390,7 @@ class TestCgnaptApproxVnf(unittest.TestCase): 'rules': ""}} cgnapt_approx_vnf.q_out.put("pipeline>") cgnapt_vnf.WAIT_TIME = 3 - self.scenario_cfg.update({"nodes": {"vnf__1": ""}}) + self.scenario_cfg.update({"nodes": {"vnf__0": ""}}) self.assertIsNone(cgnapt_approx_vnf.instantiate(self.scenario_cfg, self.context_cfg)) diff --git a/tests/unit/network_services/vnf_generic/vnf/test_prox_helpers.py b/tests/unit/network_services/vnf_generic/vnf/test_prox_helpers.py index e4319d602..54540ada1 100644 --- a/tests/unit/network_services/vnf_generic/vnf/test_prox_helpers.py +++ b/tests/unit/network_services/vnf_generic/vnf/test_prox_helpers.py @@ -17,12 +17,10 @@ from __future__ import absolute_import -import copy import os import socket import unittest from itertools import repeat, chain -from contextlib import contextmanager import mock from tests.unit import STL_MOCKS @@ -1501,7 +1499,9 @@ class TestProxResourceHelper(unittest.TestCase): helper = ProxResourceHelper(mock.MagicMock()) helper.resource = resource = mock.MagicMock() + resource.check_if_sa_running.return_value = 0, '1234' resource.amqp_collect_nfvi_kpi.return_value = 543 + resource.check_if_sa_running.return_value = (0, None) expected = {'core': 543} result = helper.collect_collectd_kpi() @@ -1513,7 +1513,9 @@ class TestProxResourceHelper(unittest.TestCase): helper._result = {'z': 123} helper.resource = resource = mock.MagicMock() + resource.check_if_sa_running.return_value = 0, '1234' resource.amqp_collect_nfvi_kpi.return_value = 543 + resource.check_if_sa_running.return_value = (0, None) queue.empty.return_value = False queue.get.return_value = {'a': 789} @@ -1800,7 +1802,7 @@ class TestProxProfileHelper(unittest.TestCase): def test_latency_cores(self): resource_helper = mock.MagicMock() - resource_helper.setup_helper.prox_config_data= [] + resource_helper.setup_helper.prox_config_data = [] helper = ProxProfileHelper(resource_helper) helper._cpu_topology = [] @@ -1977,7 +1979,6 @@ class TestProxProfileHelper(unittest.TestCase): ]), ] - client = mock.MagicMock() client.hz.return_value = 2 client.port_stats.return_value = tuple(range(12)) diff --git a/tests/unit/network_services/vnf_generic/vnf/test_prox_vnf.py b/tests/unit/network_services/vnf_generic/vnf/test_prox_vnf.py index c88b1528c..09060ff57 100644 --- a/tests/unit/network_services/vnf_generic/vnf/test_prox_vnf.py +++ b/tests/unit/network_services/vnf_generic/vnf/test_prox_vnf.py @@ -85,7 +85,7 @@ class TestProxApproxVnf(unittest.TestCase): 'vpci': '0000:05:00.0', 'local_ip': '152.16.100.19', 'type': 'PCI-PASSTHROUGH', - 'vld_id': '', + 'vld_id': 'downlink_0', 'ifname': 'xe1', 'netmask': '255.255.255.0', 'dpdk_port_num': 0, @@ -104,8 +104,8 @@ class TestProxApproxVnf(unittest.TestCase): 'vpci': '0000:05:00.1', 'local_ip': '152.16.40.19', 'type': 'PCI-PASSTHROUGH', - 'vld_id': '', - 'ifname': 'xe3', + 'vld_id': 'uplink_0', + 'ifname': 'xe1', 'driver': "i40e", 'netmask': '255.255.255.0', 'dpdk_port_num': 1, @@ -365,6 +365,7 @@ class TestProxApproxVnf(unittest.TestCase): prox_approx_vnf = ProxApproxVnf(NAME, deepcopy(self.VNFD0)) prox_approx_vnf.resource_helper = resource_helper prox_approx_vnf.vnfd_helper['vdu'][0]['external-interface'] = [] + prox_approx_vnf.vnfd_helper.port_pairs.interfaces = [] with self.assertRaises(RuntimeError): prox_approx_vnf.collect_kpi() diff --git a/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py b/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py index 77c92e0fb..4d53f0989 100644 --- a/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py +++ b/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py @@ -833,13 +833,13 @@ class TestDpdkVnfSetupEnvHelper(unittest.TestCase): dpdk_setup_helper = DpdkVnfSetupEnvHelper(vnfd_helper, ssh_helper, scenario_helper) dpdk_setup_helper.dpdk_bind_helper.bind = mock.Mock() dpdk_setup_helper.dpdk_bind_helper.used_drivers = { - '0000:05:00.0': 'd1', - '0000:05:01.0': 'd3', + 'd1': ['0000:05:00.0'], + 'd3': ['0000:05:01.0'], } self.assertIsNone(dpdk_setup_helper.tear_down()) - dpdk_setup_helper.dpdk_bind_helper.bind.assert_any_call('0000:05:00.0', 'd1', True) - dpdk_setup_helper.dpdk_bind_helper.bind.assert_any_call('0000:05:01.0', 'd3', True) + dpdk_setup_helper.dpdk_bind_helper.bind.assert_any_call(['0000:05:00.0'], 'd1', True) + dpdk_setup_helper.dpdk_bind_helper.bind.assert_any_call(['0000:05:01.0'], 'd3', True) class TestResourceHelper(unittest.TestCase): @@ -2130,6 +2130,7 @@ class TestSampleVNFTrafficGen(unittest.TestCase): def test_terminate(self): sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0) sample_vnf_tg._traffic_process = mock.Mock() + sample_vnf_tg._tg_process = mock.Mock() sample_vnf_tg.terminate() diff --git a/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py b/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py index bdda149c5..23d448c5e 100644 --- a/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py +++ b/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py @@ -426,6 +426,8 @@ class TestProxTrafficGen(unittest.TestCase): prox_traffic_gen._traffic_process.terminate = mock.Mock() prox_traffic_gen.ssh_helper = mock.MagicMock() prox_traffic_gen.setup_helper = mock.MagicMock() - prox_traffic_gen._vnf_wrapper.setup_helper = mock.MagicMock() prox_traffic_gen.resource_helper = mock.MagicMock() + prox_traffic_gen._vnf_wrapper.setup_helper = mock.MagicMock() + prox_traffic_gen._vnf_wrapper._vnf_process = mock.MagicMock() + prox_traffic_gen._vnf_wrapper.resource_helper = mock.MagicMock() self.assertEqual(None, prox_traffic_gen.terminate()) diff --git a/tests/unit/network_services/vnf_generic/vnf/test_udp_replay.py b/tests/unit/network_services/vnf_generic/vnf/test_udp_replay.py index b75ed6764..39b37c606 100644 --- a/tests/unit/network_services/vnf_generic/vnf/test_udp_replay.py +++ b/tests/unit/network_services/vnf_generic/vnf/test_udp_replay.py @@ -426,6 +426,7 @@ class TestUdpReplayApproxVnf(unittest.TestCase): udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0) udp_replay_approx_vnf._build_config = mock.MagicMock() udp_replay_approx_vnf.queue_wrapper = mock.MagicMock() + udp_replay_approx_vnf.scenario_helper = mock.MagicMock() udp_replay_approx_vnf._run() diff --git a/tests/unit/network_services/vnf_generic/vnf/test_vpe_vnf.py b/tests/unit/network_services/vnf_generic/vnf/test_vpe_vnf.py index 3813aaa21..4103d7825 100644 --- a/tests/unit/network_services/vnf_generic/vnf/test_vpe_vnf.py +++ b/tests/unit/network_services/vnf_generic/vnf/test_vpe_vnf.py @@ -565,8 +565,9 @@ class TestVpeApproxVnf(unittest.TestCase): mock_ssh(ssh) resource = mock.Mock(autospec=ResourceProfile) - resource.check_if_sa_running.return_value = False, 'error' + resource.check_if_sa_running.return_value = 1, '' resource.amqp_collect_nfvi_kpi.return_value = {'foo': 234} + resource.check_if_sa_running.return_value = (1, None) vpe_approx_vnf = VpeApproxVnf(NAME, self.VNFD_0) vpe_approx_vnf.q_in = mock.MagicMock() @@ -588,7 +589,7 @@ class TestVpeApproxVnf(unittest.TestCase): mock_ssh(ssh) resource = mock.Mock(autospec=ResourceProfile) - resource.check_if_sa_running.return_value = True, 'good' + resource.check_if_sa_running.return_value = 0, '1234' resource.amqp_collect_nfvi_kpi.return_value = {'foo': 234} vpe_approx_vnf = VpeApproxVnf(NAME, self.VNFD_0) diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py index 3ff064ae1..6aaaed888 100755 --- a/yardstick/benchmark/runners/arithmetic.py +++ b/yardstick/benchmark/runners/arithmetic.py @@ -191,7 +191,9 @@ class ArithmeticRunner(base.Runner): __execution_type__ = 'Arithmetic' def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, context_cfg, self.aborted, self.output_queue)) diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index 3ecf67736..13718d793 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -17,13 +17,14 @@ # rally/rally/benchmark/runners/base.py from __future__ import absolute_import -import importlib + import logging import multiprocessing import subprocess import time import traceback +import importlib from six.moves.queue import Empty @@ -36,7 +37,6 @@ log = logging.getLogger(__name__) def _execute_shell_command(command): """execute shell script with error handling""" exitcode = 0 - output = [] try: output = subprocess.check_output(command, shell=True) except Exception: diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py index 2cb2600c8..fbf72a74c 100644 --- a/yardstick/benchmark/runners/duration.py +++ b/yardstick/benchmark/runners/duration.py @@ -138,7 +138,9 @@ If the scenario ends before the time has elapsed, it will be started again. __execution_type__ = 'Duration' def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, context_cfg, self.aborted, self.output_queue)) diff --git a/yardstick/benchmark/runners/dynamictp.py b/yardstick/benchmark/runners/dynamictp.py index 01e76c6f4..63bfc823a 100755 --- a/yardstick/benchmark/runners/dynamictp.py +++ b/yardstick/benchmark/runners/dynamictp.py @@ -19,11 +19,12 @@ """A runner that searches for the max throughput with binary search """ -import os -import multiprocessing import logging -import traceback +import multiprocessing import time +import traceback + +import os from yardstick.benchmark.runners import base @@ -65,8 +66,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, max_throuput_found = False sequence = 0 - last_min_data = {} - last_min_data['packets_per_second'] = 0 + last_min_data = {'packets_per_second': 0} while True: sequence += 1 @@ -125,7 +125,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, queue.put(record) max_throuput_found = True - if (errors) or aborted.is_set() or max_throuput_found: + if errors or aborted.is_set() or max_throuput_found: LOG.info("worker END") break @@ -155,7 +155,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, class IterationRunner(base.Runner): - '''Run a scenario to find the max throughput + """Run a scenario to find the max throughput If the scenario ends before the time has elapsed, it will be started again. @@ -168,11 +168,13 @@ If the scenario ends before the time has elapsed, it will be started again. type: int unit: pps default: 1000 pps - ''' + """ __execution_type__ = 'Dynamictp' def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, context_cfg, self.aborted)) diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py index 88158eed3..cb0424377 100644 --- a/yardstick/benchmark/runners/iteration.py +++ b/yardstick/benchmark/runners/iteration.py @@ -20,11 +20,13 @@ """ from __future__ import absolute_import -import os -import multiprocessing + import logging -import traceback +import multiprocessing import time +import traceback + +import os from yardstick.benchmark.runners import base @@ -88,9 +90,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg, scenario_cfg['options']['rate'] -= delta sequence = 1 continue - except Exception as e: + except Exception: errors = traceback.format_exc() - LOG.exception(e) + LOG.exception("") else: if result: # add timeout for put so we don't block test @@ -151,7 +153,9 @@ If the scenario ends before the time has elapsed, it will be started again. __execution_type__ = 'Iteration' def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, context_cfg, self.aborted, self.output_queue)) diff --git a/yardstick/benchmark/runners/search.py b/yardstick/benchmark/runners/search.py index 5948763a7..8037329b5 100644 --- a/yardstick/benchmark/runners/search.py +++ b/yardstick/benchmark/runners/search.py @@ -20,15 +20,16 @@ """ from __future__ import absolute_import -import os -import multiprocessing + import logging -import traceback +import multiprocessing import time - -from collections import Mapping +import traceback from contextlib import contextmanager from itertools import takewhile + +import os +from collections import Mapping from six.moves import zip from yardstick.benchmark.runners import base @@ -173,7 +174,9 @@ If the scenario ends before the time has elapsed, it will be started again. break def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=self._worker_run, args=(cls, method, scenario_cfg, context_cfg)) self.process.start() diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py index f08ca5dde..d6e3f7109 100644 --- a/yardstick/benchmark/runners/sequence.py +++ b/yardstick/benchmark/runners/sequence.py @@ -21,11 +21,13 @@ The input value in the sequence is specified in a list in the input file. """ from __future__ import absolute_import -import os -import multiprocessing + import logging -import traceback +import multiprocessing import time +import traceback + +import os from yardstick.benchmark.runners import base @@ -140,7 +142,9 @@ class SequenceRunner(base.Runner): __execution_type__ = 'Sequence' def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, context_cfg, self.aborted, self.output_queue)) diff --git a/yardstick/benchmark/scenarios/networking/vnf_generic.py b/yardstick/benchmark/scenarios/networking/vnf_generic.py index d9cc0eac1..d85125230 100644 --- a/yardstick/benchmark/scenarios/networking/vnf_generic.py +++ b/yardstick/benchmark/scenarios/networking/vnf_generic.py @@ -30,6 +30,7 @@ from collections import defaultdict from yardstick.benchmark.scenarios import base from yardstick.common.constants import LOG_DIR +from yardstick.common.process import terminate_children from yardstick.common.utils import import_modules_from_package, itersubclasses from yardstick.common.yaml_loader import yaml_load from yardstick.network_services.collector.subscriber import Collector @@ -596,7 +597,8 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \ vnf.instantiate(self.scenario_cfg, self.context_cfg) LOG.info("Waiting for %s to instantiate", vnf.name) vnf.wait_for_instantiate() - except RuntimeError: + except: + LOG.exception("") for vnf in self.vnfs: vnf.terminate() raise @@ -635,7 +637,19 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \ :return """ - self.collector.stop() - for vnf in self.vnfs: - LOG.info("Stopping %s", vnf.name) - vnf.terminate() + try: + try: + self.collector.stop() + for vnf in self.vnfs: + LOG.info("Stopping %s", vnf.name) + vnf.terminate() + LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs)) + finally: + terminate_children() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise RuntimeError("Error in teardown") diff --git a/yardstick/common/process.py b/yardstick/common/process.py new file mode 100644 index 000000000..812ddea94 --- /dev/null +++ b/yardstick/common/process.py @@ -0,0 +1,47 @@ +# Copyright (c) 2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import multiprocessing + +import os + +LOG = logging.getLogger(__name__) + + +def check_if_process_failed(proc, timeout=1): + if proc is not None: + proc.join(timeout) + # Only abort if the process aborted + if proc.exitcode is not None and proc.exitcode > 0: + raise RuntimeError("{} exited with status {}".format(proc.name, proc.exitcode)) + + +def terminate_children(timeout=3): + current_proccess = multiprocessing.current_process() + active_children = multiprocessing.active_children() + if not active_children: + LOG.debug("no children to terminate") + return + for child in active_children: + LOG.debug("%s %s %s, child: %s %s", current_proccess.name, current_proccess.pid, + os.getpid(), child, child.pid) + LOG.debug("joining %s", child) + child.join(timeout) + child.terminate() + active_children = multiprocessing.active_children() + if not active_children: + LOG.debug("no children to terminate") + for child in active_children: + LOG.debug("%s %s %s, after terminate child: %s %s", current_proccess.name, + current_proccess.pid, os.getpid(), child, child.pid) diff --git a/yardstick/network_services/collector/subscriber.py b/yardstick/network_services/collector/subscriber.py index db52e0b45..d560e1d42 100644 --- a/yardstick/network_services/collector/subscriber.py +++ b/yardstick/network_services/collector/subscriber.py @@ -74,11 +74,12 @@ class Collector(object): # Result example: # {"VNF1: { "tput" : [1000, 999] }, "VNF2": { "latency": 100 }} LOG.debug("collect KPI for %s", node_name) - if not resource.check_if_sa_running("collectd")[0]: + if resource.check_if_sa_running("collectd")[0] != 0: continue try: results[node_name] = {"core": resource.amqp_collect_nfvi_kpi()} + LOG.debug("%s collect KPIs %s", node_name, results[node_name]['core']) except Exception: LOG.exception("") return results diff --git a/yardstick/network_services/helpers/dpdknicbind_helper.py b/yardstick/network_services/helpers/dpdkbindnic_helper.py index 605d08d38..c07613147 100644 --- a/yardstick/network_services/helpers/dpdknicbind_helper.py +++ b/yardstick/network_services/helpers/dpdkbindnic_helper.py @@ -11,9 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging + import re import itertools +import six + NETWORK_KERNEL = 'network_kernel' NETWORK_DPDK = 'network_dpdk' NETWORK_OTHER = 'network_other' @@ -22,6 +26,9 @@ CRYPTO_DPDK = 'crypto_dpdk' CRYPTO_OTHER = 'crypto_other' +LOG = logging.getLogger(__name__) + + class DpdkBindHelperException(Exception): pass @@ -123,23 +130,31 @@ class DpdkBindHelper(object): @property def interface_driver_map(self): return {interface['vpci']: interface['driver'] - for interface in itertools.chain(*self.dpdk_status.values())} + for interface in itertools.chain.from_iterable(self.dpdk_status.values())} def read_status(self): return self.parse_dpdk_status_output(self._dpdk_execute(self._status_cmd)[1]) - def bind(self, pci, driver, force=True): + def bind(self, pci_addresses, driver, force=True): + # accept single PCI or list of PCI + if isinstance(pci_addresses, six.string_types): + pci_addresses = [pci_addresses] cmd = self.DPDK_BIND_CMD.format(dpdk_nic_bind=self._dpdk_nic_bind, driver=driver, - vpci=' '.join(list(pci)), + vpci=' '.join(list(pci_addresses)), force='--force' if force else '') + LOG.debug(cmd) self._dpdk_execute(cmd) # update the inner status dict self.read_status() def save_used_drivers(self): - self.used_drivers = self.interface_driver_map + # invert the map, so we can bind by driver type + self.used_drivers = {} + # sort for stabililty + for vpci, driver in sorted(self.interface_driver_map.items()): + self.used_drivers.setdefault(driver, []).append(vpci) def rebind_drivers(self, force=True): - for vpci, driver in self.used_drivers.items(): - self.bind(vpci, driver, force) + for driver, vpcis in self.used_drivers.items(): + self.bind(vpcis, driver, force) diff --git a/yardstick/network_services/nfvi/resource.py b/yardstick/network_services/nfvi/resource.py index 7e8334c73..e3d0e3bca 100644 --- a/yardstick/network_services/nfvi/resource.py +++ b/yardstick/network_services/nfvi/resource.py @@ -19,6 +19,7 @@ from __future__ import print_function import logging from itertools import chain +import errno import jinja2 import os import os.path @@ -72,7 +73,6 @@ class ResourceProfile(object): self.timeout = timeout self.enable = True - self.cores = validate_non_string_sequence(cores, default=[]) self._queue = multiprocessing.Queue() self.amqp_client = None self.port_names = validate_non_string_sequence(port_names, default=[]) @@ -83,8 +83,16 @@ class ResourceProfile(object): def check_if_sa_running(self, process): """ verify if system agent is running """ - status, pid, _ = self.connection.execute("pgrep -f %s" % process) - return status == 0, pid + try: + err, pid, _ = self.connection.execute("pgrep -f %s" % process) + # strip whitespace + return err, pid.strip() + except OSError as e: + if e.errno in {errno.ECONNRESET}: + # if we can't connect to check, then we won't be able to connect to stop it + LOG.exception("can't connect to host to check collectd status") + return 1, None + raise def run_collectd_amqp(self): """ run amqp consumer to collect the NFVi data """ @@ -137,7 +145,7 @@ class ResourceProfile(object): def parse_intel_pmu_stats(cls, key, value): return {''.join(str(v) for v in key): value.split(":")[1]} - def parse_collectd_result(self, metrics, core_list): + def parse_collectd_result(self, metrics): """ convert collectd data into json""" result = { "cpu": {}, @@ -161,8 +169,7 @@ class ResourceProfile(object): if "cpu" in res_key0 or "intel_rdt" in res_key0: cpu_key, name, metric, testcase = \ self.get_cpu_data(res_key0, res_key1, value) - if cpu_key in core_list: - result["cpu"].setdefault(cpu_key, {}).update({name: metric}) + result["cpu"].setdefault(cpu_key, {}).update({name: metric}) elif "memory" in res_key0: result["memory"].update({res_key1: value.split(":")[0]}) @@ -189,8 +196,9 @@ class ResourceProfile(object): def amqp_process_for_nfvi_kpi(self): """ amqp collect and return nfvi kpis """ if self.amqp_client is None and self.enable: - self.amqp_client = \ - multiprocessing.Process(target=self.run_collectd_amqp) + self.amqp_client = multiprocessing.Process( + name="AmqpClient-{}-{}".format(self.mgmt['ip'], os.getpid()), + target=self.run_collectd_amqp) self.amqp_client.start() def amqp_collect_nfvi_kpi(self): @@ -201,7 +209,7 @@ class ResourceProfile(object): metric = {} while not self._queue.empty(): metric.update(self._queue.get()) - msg = self.parse_collectd_result(metric, self.cores) + msg = self.parse_collectd_result(metric) return msg def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs): @@ -265,8 +273,8 @@ class ResourceProfile(object): connection.execute("sudo rabbitmqctl authenticate_user admin admin") connection.execute("sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'") - LOG.debug("Start collectd service.....") - connection.execute("sudo %s" % collectd_path) + LOG.debug("Start collectd service..... %s second timeout", self.timeout) + connection.execute("sudo %s" % collectd_path, timeout=self.timeout) LOG.debug("Done") def initiate_systemagent(self, bin_path): @@ -292,13 +300,18 @@ class ResourceProfile(object): LOG.debug("Stop resource monitor...") if self.amqp_client is not None: + # we proper and try to join first + self.amqp_client.join(3) self.amqp_client.terminate() + LOG.debug("Check if %s is running", agent) status, pid = self.check_if_sa_running(agent) - if status == 0: + LOG.debug("status %s pid %s", status, pid) + if status != 0: return - self.connection.execute('sudo kill -9 %s' % pid) - self.connection.execute('sudo pkill -9 %s' % agent) + if pid: + self.connection.execute('sudo kill -9 "%s"' % pid) + self.connection.execute('sudo pkill -9 "%s"' % agent) self.connection.execute('sudo service rabbitmq-server stop') self.connection.execute("sudo rabbitmqctl stop_app") diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py index 67634a79c..7391633af 100644 --- a/yardstick/network_services/vnf_generic/vnf/base.py +++ b/yardstick/network_services/vnf_generic/vnf/base.py @@ -64,6 +64,8 @@ class VnfdHelper(dict): def __init__(self, *args, **kwargs): super(VnfdHelper, self).__init__(*args, **kwargs) self.port_pairs = PortPairs(self['vdu'][0]['external-interface']) + # port num is not present until binding so we have to memoize + self._port_num_map = {} @property def mgmt_interface(self): @@ -91,12 +93,14 @@ class VnfdHelper(dict): virtual_intf = interface["virtual-interface"] if virtual_intf[key] == value: return interface + raise KeyError() def find_interface(self, **kwargs): key, value = next(iter(kwargs.items())) for interface in self.interfaces: if interface[key] == value: return interface + raise KeyError() # hide dpdk_port_num key so we can abstract def find_interface_by_port(self, port): @@ -105,6 +109,7 @@ class VnfdHelper(dict): # we have to convert to int to compare if int(virtual_intf['dpdk_port_num']) == port: return interface + raise KeyError() def port_num(self, port): # we need interface name -> DPDK port num (PMD ID) -> LINK ID @@ -118,7 +123,8 @@ class VnfdHelper(dict): intf = port else: intf = self.find_interface(name=port) - return int(intf["virtual-interface"]["dpdk_port_num"]) + return self._port_num_map.setdefault(intf["name"], + int(intf["virtual-interface"]["dpdk_port_num"])) def port_nums(self, intfs): return [self.port_num(i) for i in intfs] diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py index ba4d44c41..d9fe9a948 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -364,6 +364,7 @@ class ProxSocketHelper(object): """ send data to the remote instance """ LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n')) try: + # TODO: sendall will block, we need a timeout self._sock.sendall(to_send.encode('utf-8')) except: pass @@ -593,6 +594,8 @@ _LOCAL_OBJECT = object() class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): # the actual app is lowercase APP_NAME = 'prox' + # not used for Prox but added for consistency + VNF_TYPE = "PROX" LUA_PARAMETER_NAME = "" LUA_PARAMETER_PEER = { @@ -609,6 +612,8 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): self._prox_config_data = None self.additional_files = {} self.config_queue = Queue() + # allow_exit_without_flush + self.config_queue.cancel_join_thread() self._global_section = None @property diff --git a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py index 2ac6ea412..3bfca19aa 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py @@ -16,9 +16,10 @@ import errno import logging +from yardstick.common.process import check_if_process_failed from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxDpdkVnfSetupEnvHelper from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxResourceHelper -from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, PROCESS_JOIN_TIMEOUT LOG = logging.getLogger(__name__) @@ -51,10 +52,18 @@ class ProxApproxVnf(SampleVNF): try: return self.resource_helper.execute(cmd, *args, **kwargs) except OSError as e: - if not ignore_errors or e.errno not in {errno.EPIPE, errno.ESHUTDOWN}: + if e.errno in {errno.EPIPE, errno.ESHUTDOWN, errno.ECONNRESET}: + if ignore_errors: + LOG.debug("ignoring vnf_execute exception %s for command %s", e, cmd) + else: + raise + else: raise def collect_kpi(self): + # we can't get KPIs if the VNF is down + check_if_process_failed(self._vnf_process) + if self.resource_helper is None: result = { "packets_in": 0, @@ -64,12 +73,13 @@ class ProxApproxVnf(SampleVNF): } return result - intf_count = len(self.vnfd_helper.interfaces) - if intf_count not in {1, 2, 4}: + # use all_ports so we only use ports matched in topology + port_count = len(self.vnfd_helper.port_pairs.all_ports) + if port_count not in {1, 2, 4}: raise RuntimeError("Failed ..Invalid no of ports .. " "1, 2 or 4 ports only supported at this time") - port_stats = self.vnf_execute('port_stats', range(intf_count)) + port_stats = self.vnf_execute('port_stats', range(port_count)) try: rx_total = port_stats[6] tx_total = port_stats[7] @@ -94,13 +104,20 @@ class ProxApproxVnf(SampleVNF): self.setup_helper.tear_down() def terminate(self): + # stop collectd first or we get pika errors? + self.resource_helper.stop_collect() # try to quit with socket commands - self.vnf_execute("stop_all") - self.vnf_execute("quit") - # hopefully quit succeeds and socket closes, so ignore force_quit socket errors - self.vnf_execute("force_quit", _ignore_errors=True) - if self._vnf_process: - self._vnf_process.terminate() + # pkill is not matching, debug with pgrep + self.ssh_helper.execute("sudo pgrep -lax %s" % self.setup_helper.APP_NAME) + self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.setup_helper.APP_NAME) + if self._vnf_process.is_alive(): + self.vnf_execute("stop_all") + self.vnf_execute("quit") + # hopefully quit succeeds and socket closes, so ignore force_quit socket errors + self.vnf_execute("force_quit", _ignore_errors=True) self.setup_helper.kill_vnf() self._tear_down() - self.resource_helper.stop_collect() + if self._vnf_process is not None: + LOG.debug("joining before terminate %s", self._vnf_process.name) + self._vnf_process.join(PROCESS_JOIN_TIMEOUT) + self._vnf_process.terminate() diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index bbaae39e3..114153dab 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -29,10 +29,11 @@ from six.moves import cStringIO from yardstick.benchmark.contexts.base import Context from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file +from yardstick.common.process import check_if_process_failed from yardstick.network_services.helpers.cpu import CpuSysCores from yardstick.network_services.helpers.samplevnf_helper import PortPairs from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig -from yardstick.network_services.helpers.dpdknicbind_helper import DpdkBindHelper +from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper from yardstick.network_services.nfvi.resource import ResourceProfile from yardstick.network_services.vnf_generic.vnf.base import GenericVNF from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper @@ -51,6 +52,8 @@ LOG = logging.getLogger(__name__) REMOTE_TMP = "/tmp" +DEFAULT_VNF_TIMEOUT = 3600 +PROCESS_JOIN_TIMEOUT = 3 class VnfSshHelper(AutoConnectSSH): @@ -290,8 +293,12 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): return resource def kill_vnf(self): + # pkill is not matching, debug with pgrep + self.ssh_helper.execute("sudo pgrep -lax %s" % self.APP_NAME) + self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME) # have to use exact match - self.ssh_helper.execute("sudo pkill -x %s" % self.APP_NAME) + # try using killall to match + self.ssh_helper.execute("sudo killall %s" % self.APP_NAME) def _setup_dpdk(self): """ setup dpdk environment needed for vnf to run """ @@ -330,8 +337,10 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): port_names = (intf["name"] for intf in ports) collectd_options = self.get_collectd_options() plugins = collectd_options.get("plugins", {}) + # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names, cores=cores, - plugins=plugins, interval=collectd_options.get("interval")) + plugins=plugins, interval=collectd_options.get("interval"), + timeout=self.scenario_helper.timeout) def _detect_and_bind_drivers(self): interfaces = self.vnfd_helper.interfaces @@ -386,7 +395,7 @@ class ResourceHelper(object): def _collect_resource_kpi(self): result = {} status = self.resource.check_if_sa_running("collectd")[0] - if status: + if status == 0: result = self.resource.amqp_collect_nfvi_kpi() result = {"core": result} @@ -672,12 +681,18 @@ class ScenarioHelper(object): def topology(self): return self.scenario_cfg['topology'] + @property + def timeout(self): + return self.options.get('timeout', DEFAULT_VNF_TIMEOUT) + class SampleVNF(GenericVNF): """ Class providing file-like API for generic VNF implementation """ VNF_PROMPT = "pipeline>" WAIT_TIME = 1 + APP_NAME = "SampleVNF" + # we run the VNF interactively, so the ssh command will timeout after this long def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): super(SampleVNF, self).__init__(name, vnfd) @@ -760,7 +775,8 @@ class SampleVNF(GenericVNF): def _start_vnf(self): self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT) - self._vnf_process = Process(target=self._run) + name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid()) + self._vnf_process = Process(name=name, target=self._run) self._vnf_process.start() def _vnf_up_post(self): @@ -810,6 +826,7 @@ class SampleVNF(GenericVNF): 'stdout': self.queue_wrapper, 'keep_stdin_open': True, 'pty': True, + 'timeout': self.scenario_helper.timeout, } def _build_config(self): @@ -842,11 +859,15 @@ class SampleVNF(GenericVNF): def terminate(self): self.vnf_execute("quit") - if self._vnf_process: - self._vnf_process.terminate() self.setup_helper.kill_vnf() self._tear_down() self.resource_helper.stop_collect() + if self._vnf_process is not None: + # be proper and join first before we kill + LOG.debug("joining before terminate %s", self._vnf_process.name) + self._vnf_process.join(PROCESS_JOIN_TIMEOUT) + self._vnf_process.terminate() + # no terminate children here because we share processes with tg def get_stats(self, *args, **kwargs): """ @@ -860,6 +881,8 @@ class SampleVNF(GenericVNF): return out def collect_kpi(self): + # we can't get KPIs if the VNF is down + check_if_process_failed(self._vnf_process) stats = self.get_stats() m = re.search(self.COLLECT_KPI, stats, re.MULTILINE) if m: @@ -884,7 +907,6 @@ class SampleVNFTrafficGen(GenericTrafficGen): def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): super(SampleVNFTrafficGen, self).__init__(name, vnfd) self.bin_path = get_nsb_option('bin_path', '') - self.name = "tgen__1" # name in topology file self.scenario_helper = ScenarioHelper(self.name) self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True) @@ -916,7 +938,8 @@ class SampleVNFTrafficGen(GenericTrafficGen): self.resource_helper.setup() LOG.info("Starting %s server...", self.APP_NAME) - self._tg_process = Process(target=self._start_server) + name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid()) + self._tg_process = Process(name=name, target=self._start_server) self._tg_process.start() def wait_for_instantiate(self): @@ -952,7 +975,9 @@ class SampleVNFTrafficGen(GenericTrafficGen): :param traffic_profile: :return: True/False """ - self._traffic_process = Process(target=self._traffic_runner, + name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__, + os.getpid()) + self._traffic_process = Process(name=name, target=self._traffic_runner, args=(traffic_profile,)) self._traffic_process.start() # Wait for traffic process to start @@ -983,6 +1008,9 @@ class SampleVNFTrafficGen(GenericTrafficGen): pass def collect_kpi(self): + # check if the tg processes have exited + for proc in (self._tg_process, self._traffic_process): + check_if_process_failed(proc) result = self.resource_helper.collect_kpi() LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result @@ -993,5 +1021,15 @@ class SampleVNFTrafficGen(GenericTrafficGen): :return: True/False """ self.traffic_finished = True + # we must kill client before we kill the server, or the client will raise exception if self._traffic_process is not None: + # be proper and try to join before terminating + LOG.debug("joining before terminate %s", self._traffic_process.name) + self._traffic_process.join(PROCESS_JOIN_TIMEOUT) self._traffic_process.terminate() + if self._tg_process is not None: + # be proper and try to join before terminating + LOG.debug("joining before terminate %s", self._tg_process.name) + self._tg_process.join(PROCESS_JOIN_TIMEOUT) + self._tg_process.terminate() + # no terminate children here because we share processes with vnf diff --git a/yardstick/network_services/vnf_generic/vnf/udp_replay.py b/yardstick/network_services/vnf_generic/vnf/udp_replay.py index 6b7779782..a57f53bc7 100644 --- a/yardstick/network_services/vnf_generic/vnf/udp_replay.py +++ b/yardstick/network_services/vnf_generic/vnf/udp_replay.py @@ -15,6 +15,7 @@ from __future__ import absolute_import import logging +from yardstick.common.process import check_if_process_failed from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper @@ -107,6 +108,8 @@ class UdpReplayApproxVnf(SampleVNF): def collect_kpi(self): def get_sum(offset): return sum(int(i) for i in split_stats[offset::5]) + # we can't get KPIs if the VNF is down + check_if_process_failed(self._vnf_process) number_of_ports = len(self.vnfd_helper.port_pairs.all_ports) diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py index 5f1c4d4d3..c02c0eb27 100644 --- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py @@ -24,6 +24,7 @@ import posixpath from six.moves import configparser, zip +from yardstick.common.process import check_if_process_failed from yardstick.network_services.helpers.samplevnf_helper import PortPairs from yardstick.network_services.pipeline import PipelineRules from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper @@ -278,6 +279,8 @@ class VpeApproxVnf(SampleVNF): raise NotImplementedError def collect_kpi(self): + # we can't get KPIs if the VNF is down + check_if_process_failed(self._vnf_process) result = { 'pkt_in_up_stream': 0, 'pkt_drop_up_stream': 0, |