From b3d72298b576651430121d6d36e1956fd6a689dc Mon Sep 17 00:00:00 2001 From: Deepak S Date: Tue, 20 Jun 2017 14:24:26 -0700 Subject: Collectd Change-Id: I15e4ac38b347a08350b71c68469e2793eeed92ab Signed-off-by: Deepak S Signed-off-by: Edward MacGillivray Signed-off-by: Ross Brattain --- tests/unit/network_services/nfvi/test_resource.py | 220 +++++++++++++++++++- yardstick/network_services/nfvi/collectd.conf | 59 ++++-- yardstick/network_services/nfvi/collectd.sh | 73 ++++++- yardstick/network_services/nfvi/resource.py | 227 +++++++++++++++------ .../network_services/vnf_generic/vnf/sample_vnf.py | 3 +- 5 files changed, 483 insertions(+), 99 deletions(-) diff --git a/tests/unit/network_services/nfvi/test_resource.py b/tests/unit/network_services/nfvi/test_resource.py index e2640ac74..cb26fd085 100644 --- a/tests/unit/network_services/nfvi/test_resource.py +++ b/tests/unit/network_services/nfvi/test_resource.py @@ -92,9 +92,11 @@ class TestResourceProfile(unittest.TestCase): mock.Mock(return_value=(0, {}, "")) ssh.from_node.return_value = ssh_mock + mgmt = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['mgmt-interface'] + interfaces = \ + self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['vdu'][0]['external-interface'] self.resource_profile = \ - ResourceProfile(self.VNFD['vnfd:vnfd-catalog']['vnfd'][0], - [1, 2, 3]) + ResourceProfile(mgmt, interfaces, [1, 2, 3]) def test___init__(self): self.assertEqual(True, self.resource_profile.enable) @@ -107,13 +109,13 @@ class TestResourceProfile(unittest.TestCase): reskey = ["", "cpufreq", "cpufreq-0"] value = "metric:10" val = self.resource_profile.get_cpu_data(reskey, value) - self.assertEqual(val, ['0', 'cpufreq', '10', 'metric']) + self.assertIsNotNone(val) def test_get_cpu_data_error(self): reskey = ["", "", ""] value = "metric:10" val = self.resource_profile.get_cpu_data(reskey, value) - self.assertEqual(val, ['error', 'Invalid', '']) + self.assertEqual(val, ('error', 'Invalid', '', '')) def test__start_collectd(self): with mock.patch("yardstick.ssh.SSH") as ssh: @@ -121,32 +123,228 @@ class TestResourceProfile(unittest.TestCase): ssh_mock.execute = \ mock.Mock(return_value=(0, "", "")) ssh.from_node.return_value = ssh_mock + mgmt = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['mgmt-interface'] + interfaces = \ + self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['vdu'][0]['external-interface'] resource_profile = \ - ResourceProfile(self.VNFD['vnfd:vnfd-catalog']['vnfd'][0], - [1, 2, 3]) + ResourceProfile(mgmt, interfaces, [1, 2, 3]) + resource_profile._prepare_collectd_conf = mock.Mock() self.assertIsNone( resource_profile._start_collectd(ssh_mock, "/opt/nsb_bin")) - def test_initiate_systemagent(self): + def test__prepare_collectd_conf_BM(self): with mock.patch("yardstick.ssh.SSH") as ssh: ssh_mock = mock.Mock(autospec=ssh.SSH) ssh_mock.execute = \ mock.Mock(return_value=(0, "", "")) ssh.from_node.return_value = ssh_mock + mgmt = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['mgmt-interface'] + interfaces = \ + self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['vdu'][0]['external-interface'] resource_profile = \ - ResourceProfile(self.VNFD['vnfd:vnfd-catalog']['vnfd'][0], - [1, 2, 3]) + ResourceProfile(mgmt, interfaces, [1, 2, 3]) + resource_profile._provide_config_file = mock.Mock() + self.assertIsNone( + resource_profile._prepare_collectd_conf("/opt/nsb_bin")) + + def test__prepare_collectd_conf_managed_ovs_dpdk(self): + with mock.patch("yardstick.ssh.SSH") as ssh: + ssh_mock = mock.Mock(autospec=ssh.SSH) + ssh_mock.execute = \ + mock.Mock(return_value=(0, "", "")) + ssh.from_node.return_value = ssh_mock + mgmt = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['mgmt-interface'] + interfaces = \ + self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['vdu'][0]['external-interface'] + resource_profile = \ + ResourceProfile(mgmt, interfaces, [1, 2, 3]) + resource_profile._provide_config_file = mock.Mock() + self.assertIsNone( + resource_profile._prepare_collectd_conf("/opt/nsb_bin")) + + def test__prepare_collectd_conf_ovs_dpdk(self): + with mock.patch("yardstick.ssh.SSH") as ssh: + ssh_mock = mock.Mock(autospec=ssh.SSH) + ssh_mock.execute = \ + mock.Mock(return_value=(0, "", "")) + ssh.from_node.return_value = ssh_mock + mgmt = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['mgmt-interface'] + interfaces = \ + self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['vdu'][0]['external-interface'] + resource_profile = \ + ResourceProfile(mgmt, interfaces, [1, 2, 3]) + resource_profile._provide_config_file = mock.Mock() + self.assertIsNone( + resource_profile._prepare_collectd_conf("/opt/nsb_bin")) + + def test__prepare_collectd_conf_managed_sriov(self): + with mock.patch("yardstick.ssh.SSH") as ssh: + ssh_mock = mock.Mock(autospec=ssh.SSH) + ssh_mock.execute = \ + mock.Mock(return_value=(0, "", "")) + ssh.from_node.return_value = ssh_mock + mgmt = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['mgmt-interface'] + interfaces = \ + self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['vdu'][0]['external-interface'] + resource_profile = \ + ResourceProfile(mgmt, interfaces, [1, 2, 3]) + resource_profile._provide_config_file = mock.Mock() + self.assertIsNone( + resource_profile._prepare_collectd_conf("/opt/nsb_bin")) + + def test__prepare_collectd_conf_sriov(self): + with mock.patch("yardstick.ssh.SSH") as ssh: + ssh_mock = mock.Mock(autospec=ssh.SSH) + ssh_mock.execute = \ + mock.Mock(return_value=(0, "", "")) + ssh.from_node.return_value = ssh_mock + mgmt = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['mgmt-interface'] + interfaces = \ + self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['vdu'][0]['external-interface'] + resource_profile = \ + ResourceProfile(mgmt, interfaces, [1, 2, 3]) + resource_profile._provide_config_file = mock.Mock() + self.assertIsNone( + resource_profile._prepare_collectd_conf("/opt/nsb_bin")) + + @mock.patch("yardstick.network_services.nfvi.resource.open") + @mock.patch("yardstick.network_services.nfvi.resource.tempfile") + @mock.patch("yardstick.network_services.nfvi.resource.os") + def test__provide_config_file(self, mock_open, mock_tempfile, mock_os): + with mock.patch("yardstick.ssh.SSH") as ssh: + ssh_mock = mock.Mock(autospec=ssh.SSH) + ssh_mock.execute = \ + mock.Mock(return_value=(0, "", "")) + ssh.from_node.return_value = ssh_mock + mgmt = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['mgmt-interface'] + interfaces = \ + self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['vdu'][0]['external-interface'] + resource_profile = \ + ResourceProfile(mgmt, interfaces, [1, 2, 3]) + resource_profile._prepare_collectd_conf = mock.Mock() + resource_profile.connection = ssh_mock + resource_profile.connection.put = \ + mock.Mock(return_value=(0, "", "")) + mock_tempfile.mkstemp = mock.Mock(return_value=["test", ""]) + self.assertIsNone( + resource_profile._provide_config_file("/opt/nsb_bin", + "collectd.cfg", {})) + + @mock.patch("yardstick.network_services.nfvi.resource.open") + def test_initiate_systemagent(self, mock_open): + with mock.patch("yardstick.ssh.SSH") as ssh: + ssh_mock = mock.Mock(autospec=ssh.SSH) + ssh_mock.execute = \ + mock.Mock(return_value=(0, "", "")) + ssh.from_node.return_value = ssh_mock + mgmt = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['mgmt-interface'] + interfaces = \ + self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['vdu'][0]['external-interface'] + resource_profile = \ + ResourceProfile(mgmt, interfaces, [1, 2, 3]) + resource_profile._start_collectd = mock.Mock() self.assertIsNone( resource_profile.initiate_systemagent("/opt/nsb_bin")) + def test__parse_hugepages(self): + reskey = ["cpu", "cpuFreq"] + value = "timestamp:12345" + res = self.resource_profile.parse_hugepages(reskey, value) + self.assertEqual({'cpu/cpuFreq': '12345'}, res) + + def test__parse_dpdkstat(self): + reskey = ["dpdk0", "0"] + value = "tx:12345" + res = self.resource_profile.parse_dpdkstat(reskey, value) + self.assertEqual({'dpdk0/0': '12345'}, res) + + def test__parse_virt(self): + reskey = ["vm0", "cpu"] + value = "load:45" + res = self.resource_profile.parse_virt(reskey, value) + self.assertEqual({'vm0/cpu': '45'}, res) + + def test__parse_ovs_stats(self): + reskey = ["ovs", "stats"] + value = "tx:45" + res = self.resource_profile.parse_ovs_stats(reskey, value) + self.assertEqual({'ovs/stats': '45'}, res) + def test_parse_collectd_result(self): res = self.resource_profile.parse_collectd_result({}, [0, 1, 2]) - self.assertDictEqual(res, {'timestamp': '', 'cpu': {}, 'memory': {}}) + expected_result = {'cpu': {}, 'dpdkstat': {}, 'hugepages': {}, + 'memory': {}, 'ovs_stats': {}, 'timestamp': '', + 'virt': {}} + self.assertDictEqual(res, expected_result) + + def test_parse_collectd_result_cpu(self): + metric = {"nsb_stats/cpu/0/ipc": "101"} + self.resource_profile.get_cpu_data = mock.Mock(return_value=[1, + "ipc", + "1234", + ""]) + res = self.resource_profile.parse_collectd_result(metric, [0, 1, 2]) + expected_result = {'cpu': {1: {'ipc': '1234'}}, 'dpdkstat': {}, 'hugepages': {}, + 'memory': {}, 'ovs_stats': {}, 'timestamp': '', + 'virt': {}} + self.assertDictEqual(res, expected_result) + + def test_parse_collectd_result_memory(self): + metric = {"nsb_stats/memory/bw": "101"} + res = self.resource_profile.parse_collectd_result(metric, [0, 1, 2]) + expected_result = {'cpu': {}, 'dpdkstat': {}, 'hugepages': {}, + 'memory': {'bw': '101'}, 'ovs_stats': {}, 'timestamp': '', + 'virt': {}} + self.assertDictEqual(res, expected_result) + + def test_parse_collectd_result_hugepage(self): + metric = {"nsb_stats/hugepages/free": "101"} + self.resource_profile.parse_hugepages = \ + mock.Mock(return_value={"free": "101"}) + res = self.resource_profile.parse_collectd_result(metric, [0, 1, 2]) + expected_result = {'cpu': {}, 'dpdkstat': {}, 'hugepages': {'free': + '101'}, + 'memory': {}, 'ovs_stats': {}, 'timestamp': '', + 'virt': {}} + self.assertDictEqual(res, expected_result) + + def test_parse_collectd_result_dpdk_virt_ovs(self): + metric = {"nsb_stats/dpdkstat/tx": "101", + "nsb_stats/ovs_stats/tx": "101", + "nsb_stats/virt/virt/memory": "101"} + self.resource_profile.parse_dpdkstat = \ + mock.Mock(return_value={"tx": "101"}) + self.resource_profile.parse_virt = \ + mock.Mock(return_value={"memory": "101"}) + self.resource_profile.parse_ovs_stats = \ + mock.Mock(return_value={"tx": "101"}) + res = self.resource_profile.parse_collectd_result(metric, [0, 1, 2]) + expected_result = {'cpu': {}, 'dpdkstat': {'tx': '101'}, 'hugepages': {}, + 'memory': {}, 'ovs_stats': {'tx': '101'}, 'timestamp': '', + 'virt': {'memory': '101'}} + self.assertDictEqual(res, expected_result) + + def test_amqp_process_for_nfvi_kpi(self): + self.resource_profile.amqp_client = \ + mock.MagicMock(side_effect=[None, mock.MagicMock()]) + self.resource_profile.run_collectd_amqp = \ + mock.Mock(return_value=0) + res = self.resource_profile.amqp_process_for_nfvi_kpi() + self.assertEqual(None, res) + + def test_amqp_collect_nfvi_kpi(self): + self.resource_profile.amqp_client = \ + mock.MagicMock(side_effect=[None, mock.MagicMock()]) + self.resource_profile.run_collectd_amqp = \ + mock.Mock(return_value=0) + self.resource_profile.parse_collectd_result = mock.Mock() + res = self.resource_profile.amqp_collect_nfvi_kpi() + self.assertIsNotNone(res) def test_run_collectd_amqp(self): _queue = multiprocessing.Queue() resource.AmqpConsumer = mock.Mock(autospec=collectd) - self.assertIsNone(self.resource_profile.run_collectd_amqp(_queue)) + self.assertIsNone(self.resource_profile.run_collectd_amqp()) def test_start(self): self.assertIsNone(self.resource_profile.start()) diff --git a/yardstick/network_services/nfvi/collectd.conf b/yardstick/network_services/nfvi/collectd.conf index abcf24ded..6d8b73f7f 100644 --- a/yardstick/network_services/nfvi/collectd.conf +++ b/yardstick/network_services/nfvi/collectd.conf @@ -15,7 +15,7 @@ Hostname "nsb_stats" FQDNLookup true -Interval 5 +Interval {interval} ############################################################################## # LoadPlugin section # @@ -23,10 +23,8 @@ Interval 5 # Specify what features to activate. # ############################################################################## -LoadPlugin amqp -LoadPlugin cpu -LoadPlugin intel_rdt -LoadPlugin memory +#LoadPlugin syslog +{loadplugin} ############################################################################## # Plugin configuration # @@ -35,6 +33,10 @@ LoadPlugin memory # ription of those options is available in the collectd.conf(5) manual page. # ############################################################################## +# +# LogLevel debug +# + Host "0.0.0.0" @@ -53,7 +55,7 @@ LoadPlugin memory ReportByCpu true ReportByState true - ValuesPercentage false + ValuesPercentage true @@ -61,18 +63,47 @@ LoadPlugin memory ValuesPercentage false - - Interval 5 - Cores "" - - - Host "127.0.0.1" - Port "11211" - + + ReportPerNodeHP true + ReportRootHP true + ValuesPages true + ValuesBytes false + ValuesPercentage false + + + + ReportPerNodeHP true + ReportRootHP true + ValuesPages true + ValuesBytes false + ValuesPercentage false + + + + + Coremask "0x1" + MemoryChannels "4" + ProcessType "secondary" + FilePrefix "rte" + + SharedMemObj "dpdk_collectd_stats_0" + EnabledPortMask 0xffff + {dpdk_interface} + + + + Domain "samplevnf" + + + + Port "6640" + Address "127.0.0.1" + Socket "/usr/local/var/run/openvswitch/db.sock" + Bridges "br0" "br_ext" diff --git a/yardstick/network_services/nfvi/collectd.sh b/yardstick/network_services/nfvi/collectd.sh index 7acb40431..8162ec539 100755 --- a/yardstick/network_services/nfvi/collectd.sh +++ b/yardstick/network_services/nfvi/collectd.sh @@ -22,8 +22,20 @@ if [ "$(whoami)" != "root" ]; then exit 1; fi +echo "setup proxy..." +http_proxy=$1 +https_proxy=$2 +if [[ "$http_proxy" != "" ]]; then + export http_proxy=$http_proxy + export https_proxy=$http_proxy +fi + +if [[ "$https_proxy" != "" ]]; then + export https_proxy=$https_proxy +fi + echo "Install required libraries to run collectd..." -pkg=(git flex bison build-essential pkg-config automake autotools-dev libltdl-dev librabbitmq-dev rabbitmq-server) +pkg=(git flex bison build-essential pkg-config automake autotools-dev libltdl-dev librabbitmq-dev rabbitmq-server cmake) for i in "${pkg[@]}"; do dpkg-query -W --showformat='${Status}\n' "${i}"|grep "install ok installed" if [ "$?" -eq "1" ]; then @@ -43,7 +55,6 @@ else rm -rf intel-cmt-cat >/dev/null git clone https://github.com/01org/intel-cmt-cat.git pushd intel-cmt-cat - git checkout tags/v1.5 -b v1.5 make install PREFIX=/usr popd @@ -51,7 +62,59 @@ else echo "Done." fi -which /opt/nsb_bin/collectd/collectd >/dev/null +ls /usr/lib/libdpdk.so >/dev/null +if [ $? -eq 0 ] +then + echo "DPDK already installed. Done" +else + pushd . + + echo "Get dpdk and install..." + mkdir -p $INSTALL_NSB_BIN + rm -rf "$INSTALL_NSB_BIN"/dpdk >/dev/null + git clone http://dpdk.org/git/dpdk + pushd dpdk + mkdir -p /mnt/huge + mount -t hugetlbfs nodev /mnt/huge + sed -i 's/CONFIG_RTE_BUILD_SHARED_LIB=n/CONFIG_RTE_BUILD_SHARED_LIB=y/g' config/common_base + sed -i 's/CONFIG_RTE_EAL_PMD_PATH=""/CONFIG_RTE_EAL_PMD_PATH="\/usr\/lib\/dpdk-pmd\/"/g' config/common_base + + echo "Build dpdk v16.04" + make config T=x86_64-native-linuxapp-gcc + make + sudo make install prefix=/usr + mkdir -p /usr/lib/dpdk-pmd + find /usr/lib -type f -name 'librte_pmd*' | while read path ; do ln -s $path /usr/lib/dpdk-pmd/`echo $path | grep -o 'librte_.*so'` ; done + + echo "Disable ASLR." + echo 0 > /proc/sys/kernel/randomize_va_space + make install PREFIX=/usr + popd + + popd + echo "Done." +fi + +which $INSTALL_NSB_BIN/yajl > /dev/null +if [ -f "/usr/local/lib/libyajl.so.2.1.1" ] +then + echo "ovs stats libs already installed." +else + echo "installing ovs stats libraries" + pushd . + + cd $INSTALL_NSB_BIN + git clone https://github.com/lloyd/yajl.git + pushd yajl + ./configure + make + make install + popd + + popd +fi + +which $INSTALL_NSB_BIN/collectd/collectd >/dev/null if [ $? -eq 0 ] then echo "Collectd already installed. Done" @@ -62,9 +125,9 @@ else git clone https://github.com/collectd/collectd.git pushd collectd git stash - git checkout -b collectd 43a4db3b3209f497a0ba408aebf8aee385c6262d + git checkout -n nfvi 47c86ace348a1d7a5352a83d10935209f89aa4f5 ./build.sh - ./configure --with-libpqos=/usr/ + ./configure --with-libpqos=/usr/ --with-libdpdk=/usr --with-libyajl=/usr/local --enable-debug --enable-dpdkstat --enable-virt --enable-ovs_stats make install > /dev/null popd echo "Done." diff --git a/yardstick/network_services/nfvi/resource.py b/yardstick/network_services/nfvi/resource.py index 18b0d8952..ce09b6597 100644 --- a/yardstick/network_services/nfvi/resource.py +++ b/yardstick/network_services/nfvi/resource.py @@ -14,19 +14,28 @@ """ Resource collection definitions """ from __future__ import absolute_import +from __future__ import print_function +import tempfile import logging +import os import os.path import re import multiprocessing +from collections import Sequence + from oslo_config import cfg from yardstick import ssh from yardstick.network_services.nfvi.collectd import AmqpConsumer from yardstick.network_services.utils import provision_tool +LOG = logging.getLogger(__name__) + CONF = cfg.CONF ZMQ_OVS_PORT = 5567 ZMQ_POLLING_TIME = 12000 +LIST_PLUGINS_ENABLED = ["amqp", "cpu", "cpufreq", "intel_rdt", "memory", + "hugepages", "dpdkstat", "virt", "ovs_stats"] class ResourceProfile(object): @@ -34,16 +43,17 @@ class ResourceProfile(object): This profile adds a resource at the beginning of the test session """ - def __init__(self, vnfd, cores): + def __init__(self, mgmt, interfaces=None, cores=None): self.enable = True self.connection = None - self.cores = cores + self.cores = cores if isinstance(cores, Sequence) else [] + self._queue = multiprocessing.Queue() + self.amqp_client = None + self.interfaces = interfaces if isinstance(interfaces, Sequence) else [] - mgmt_interface = vnfd.get("mgmt-interface") # why the host or ip? - self.vnfip = mgmt_interface.get("host", mgmt_interface["ip"]) - self.connection = ssh.SSH.from_node(mgmt_interface, - overrides={"ip": self.vnfip}) + self.vnfip = mgmt.get("host", mgmt["ip"]) + self.connection = ssh.SSH.from_node(mgmt, overrides={"ip": self.vnfip}) self.connection.wait() @@ -52,81 +62,147 @@ class ResourceProfile(object): err, pid, _ = self.connection.execute("pgrep -f %s" % process) return [err == 0, pid] - def run_collectd_amqp(self, queue): + def run_collectd_amqp(self): """ run amqp consumer to collect the NFVi data """ - amqp = \ - AmqpConsumer('amqp://admin:admin@{}:5672/%2F'.format(self.vnfip), - queue) + amqp_url = 'amqp://admin:admin@{}:5672/%2F'.format(self.vnfip) + amqp = AmqpConsumer(amqp_url, self._queue) try: amqp.run() except (AttributeError, RuntimeError, KeyboardInterrupt): amqp.stop() @classmethod - def get_cpu_data(cls, reskey, value): + def parse_simple_resource(cls, key, value): + return {'/'.join(key): value.split(":")[1]} + + @classmethod + def get_cpu_data(cls, key_split, value): """ Get cpu topology of the host """ pattern = r"-(\d+)" - if "cpufreq" in reskey[1]: - match = re.search(pattern, reskey[2], re.MULTILINE) - metric = reskey[1] + if "cpufreq" in key_split[0]: + metric = key_split[0] + source = key_split[1] else: - match = re.search(pattern, reskey[1], re.MULTILINE) - metric = reskey[2] + metric = key_split[1] + source = key_split[0] + + match = re.search(pattern, source, re.MULTILINE) + if not match: + return "error", "Invalid", "", "" + + time, value = value.split(":") + return str(match.group(1)), metric, value, time + + @classmethod + def parse_hugepages(cls, key, value): + return cls.parse_simple_resource(key, value) - time, val = re.split(":", value) - if match: - return [str(match.group(1)), metric, val, time] + @classmethod + def parse_dpdkstat(cls, key, value): + return cls.parse_simple_resource(key, value) + + @classmethod + def parse_virt(cls, key, value): + return cls.parse_simple_resource(key, value) - return ["error", "Invalid", ""] + @classmethod + def parse_ovs_stats(cls, key, value): + return cls.parse_simple_resource(key, value) - def parse_collectd_result(self, metrics, listcores): + def parse_collectd_result(self, metrics, core_list): """ convert collectd data into json""" - res = {"cpu": {}, "memory": {}} + result = { + "cpu": {}, + "memory": {}, + "hugepages": {}, + "dpdkstat": {}, + "virt": {}, + "ovs_stats": {}, + } testcase = "" for key, value in metrics.items(): - reskey = key.rsplit("/") - if "cpu" in reskey[1] or "intel_rdt" in reskey[1]: - cpu_key, name, metric, testcase = \ - self.get_cpu_data(reskey, value) - if cpu_key in listcores: - res["cpu"].setdefault(cpu_key, {}).update({name: metric}) - elif "memory" in reskey[1]: - val = re.split(":", value)[1] - res["memory"].update({reskey[2]: val}) - res["timestamp"] = testcase - - return res - - def amqp_collect_nfvi_kpi(self, _queue=multiprocessing.Queue()): + key_split = key.split("/") + res_key_iter = (key for key in key_split if "nsb_stats" not in key) + res_key0 = next(res_key_iter) + res_key1 = next(res_key_iter) + + if "cpu" in res_key0 or "intel_rdt" in res_key0: + cpu_key, name, metric, testcase = self.get_cpu_data(key_split, value) + if cpu_key in core_list: + result["cpu"].setdefault(cpu_key, {}).update({name: metric}) + + elif "memory" in res_key0: + result["memory"].update({res_key1: value.split(":")[0]}) + + elif "hugepages" in res_key0: + result["hugepages"].update(self.parse_hugepages(key, value)) + + elif "dpdkstat" in res_key0: + result["dpdkstat"].update(self.parse_dpdkstat(key, value)) + + elif "virt" in res_key1: + result["virt"].update(self.parse_virt(key, value)) + + elif "ovs_stats" in res_key0: + result["ovs_stats"].update(self.parse_ovs_stats(key, value)) + + result["timestamp"] = testcase + + return result + + def amqp_process_for_nfvi_kpi(self): """ amqp collect and return nfvi kpis """ - try: - metric = {} - amqp_client = \ - multiprocessing.Process(target=self.run_collectd_amqp, - args=(_queue,)) - amqp_client.start() - amqp_client.join(7) - amqp_client.terminate() - - while not _queue.empty(): - metric.update(_queue.get()) - except (AttributeError, RuntimeError, TypeError, ValueError): - logging.debug("Failed to get NFVi stats...") - msg = {} - else: - msg = self.parse_collectd_result(metric, self.cores) + if self.amqp_client is None: + self.amqp_client = \ + multiprocessing.Process(target=self.run_collectd_amqp) + self.amqp_client.start() + def amqp_collect_nfvi_kpi(self): + """ amqp collect and return nfvi kpis """ + metric = {} + while not self._queue.empty(): + metric.update(self._queue.get()) + msg = self.parse_collectd_result(metric, self.cores) return msg - @classmethod - def _start_collectd(cls, connection, bin_path): - connection.execute('pkill -9 collectd') + def _provide_config_file(self, bin_path, nfvi_cfg, kwargs): + with open(os.path.join(bin_path, nfvi_cfg), 'r') as cfg: + template = cfg.read() + cfg, cfg_content = tempfile.mkstemp() + with os.fdopen(cfg, "w+") as cfg: + cfg.write(template.format(**kwargs)) + cfg_file = os.path.join(bin_path, nfvi_cfg) + self.connection.put(cfg_content, cfg_file) + + def _prepare_collectd_conf(self, bin_path): + """ Prepare collectd conf """ + loadplugin = "\n".join("LoadPlugin {0}".format(plugin) + for plugin in LIST_PLUGINS_ENABLED) + + interfaces = "\n".join("PortName '{0[name]}'".format(interface) + for interface in self.interfaces) + + kwargs = { + "interval": '25', + "loadplugin": loadplugin, + "dpdk_interface": interfaces, + } + + self._provide_config_file(bin_path, 'collectd.conf', kwargs) + + def _start_collectd(self, connection, bin_path): + LOG.debug("Starting collectd to collect NFVi stats") + # temp disable + return + connection.execute('sudo pkill -9 collectd') collectd = os.path.join(bin_path, "collectd.sh") provision_tool(connection, collectd) - provision_tool(connection, os.path.join(bin_path, "collectd.conf")) + self._prepare_collectd_conf(bin_path) # Reset amqp queue + LOG.debug("reset and setup amqp to collect data from collectd") + connection.execute("sudo rm -rf /var/lib/rabbitmq/mnesia/rabbit*") connection.execute("sudo service rabbitmq-server start") connection.execute("sudo rabbitmqctl stop_app") connection.execute("sudo rabbitmqctl reset") @@ -134,8 +210,15 @@ class ResourceProfile(object): connection.execute("sudo service rabbitmq-server restart") # Run collectd - connection.execute(collectd) - connection.execute(os.path.join(bin_path, "collectd", "collectd")) + + http_proxy = os.environ.get('http_proxy', '') + https_proxy = os.environ.get('https_proxy', '') + connection.execute("sudo %s '%s' '%s'" % + (collectd, http_proxy, https_proxy)) + LOG.debug("Start collectd service.....") + connection.execute( + "sudo %s" % os.path.join(bin_path, "collectd", "collectd")) + LOG.debug("Done") def initiate_systemagent(self, bin_path): """ Start system agent for NFVi collection on host """ @@ -145,16 +228,24 @@ class ResourceProfile(object): def start(self): """ start nfvi collection """ if self.enable: - logging.debug("Start NVFi metric collection...") + LOG.debug("Start NVFi metric collection...") def stop(self): """ stop nfvi collection """ - if self.enable: - agent = "collectd" - logging.debug("Stop resource monitor...") - status, pid = self.check_if_sa_running(agent) - if status: - self.connection.execute('kill -9 %s' % pid) - self.connection.execute('pkill -9 %s' % agent) - self.connection.execute('service rabbitmq-server stop') - self.connection.execute("sudo rabbitmqctl stop_app") + if not self.enable: + return + + agent = "collectd" + LOG.debug("Stop resource monitor...") + + if self.amqp_client is not None: + self.amqp_client.terminate() + + status, pid = self.check_if_sa_running(agent) + if status == 0: + return + + self.connection.execute('sudo kill -9 %s' % pid) + self.connection.execute('sudo pkill -9 %s' % agent) + self.connection.execute('sudo service rabbitmq-server stop') + self.connection.execute("sudo rabbitmqctl stop_app") diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 89c086d97..e08f51784 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -331,7 +331,8 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): self.socket = 1 cores = self._validate_cpu_cfg() - return ResourceProfile(self.vnfd_helper, cores) + return ResourceProfile(self.vnfd_helper.mgmt_interface, + interfaces=self.vnfd_helper.interfaces, cores=cores) def _detect_drivers(self): interfaces = self.vnfd_helper.interfaces -- cgit 1.2.3-korg