diff options
author | Deepak S <deepak.s@linux.intel.com> | 2016-12-30 09:22:25 -0800 |
---|---|---|
committer | Deepak S <deepak.s@linux.intel.com> | 2017-01-19 08:28:10 +0530 |
commit | ddb76faa5841997bd3eec4ed2f3d33f56e66d0c3 (patch) | |
tree | 2970946f924aedcc318158d19e24a3252cdf0cd5 | |
parent | 4c02cf5d19e2c36c9747a1c05d86331a1918baec (diff) |
Add infrastructure to add the NFVi KPI collections
This patches added common function to collect NFVi KPIs for given usecases
- Core KPIs like memory/LLC/IPC etc
- OVS stats
- memory stats etc.
JIRA: YARDSTICK-488
Change-Id: Iab41146392efc47b7313b1846a67728a44d0f1d6
Signed-off-by: Deepak S <deepak.s@linux.intel.com>
-rw-r--r-- | tests/unit/network_services/nfvi/__init__.py | 0 | ||||
-rw-r--r-- | tests/unit/network_services/nfvi/test_collectd.py | 160 | ||||
-rw-r--r-- | tests/unit/network_services/nfvi/test_resource.py | 171 | ||||
-rw-r--r-- | yardstick/network_services/nfvi/__init__.py | 0 | ||||
-rw-r--r-- | yardstick/network_services/nfvi/collectd.conf | 80 | ||||
-rw-r--r-- | yardstick/network_services/nfvi/collectd.py | 158 | ||||
-rwxr-xr-x | yardstick/network_services/nfvi/collectd.sh | 89 | ||||
-rw-r--r-- | yardstick/network_services/nfvi/resource.py | 162 |
8 files changed, 820 insertions, 0 deletions
diff --git a/tests/unit/network_services/nfvi/__init__.py b/tests/unit/network_services/nfvi/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/tests/unit/network_services/nfvi/__init__.py diff --git a/tests/unit/network_services/nfvi/test_collectd.py b/tests/unit/network_services/nfvi/test_collectd.py new file mode 100644 index 000000000..5bd7196df --- /dev/null +++ b/tests/unit/network_services/nfvi/test_collectd.py @@ -0,0 +1,160 @@ +# Copyright (c) 2016-2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +import unittest +import multiprocessing +import mock + +from yardstick.network_services.nfvi.collectd import AmqpConsumer + + +class TestAmqpConsumer(unittest.TestCase): + def setUp(self): + self.queue = multiprocessing.Queue() + self.url = 'amqp://admin:admin@1.1.1.1:5672/%2F' + self.amqp_consumer = AmqpConsumer(self.url, self.queue) + + def test___init__(self): + self.assertEqual(self.url, self.amqp_consumer._url) + + def test_connect(self): + self.assertRaises(RuntimeError, self.amqp_consumer.connect) + + def test_on_connection_open(self): + self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer) + self.amqp_consumer._connection.add_on_close_callback = \ + mock.Mock(return_value=0) + self.amqp_consumer._connection.channel = mock.Mock(return_value=0) + self.assertEqual(None, self.amqp_consumer.on_connection_open(10)) + + def test_on_connection_closed(self): + self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer) + self.amqp_consumer._connection.ioloop = mock.Mock() + self.amqp_consumer._connection.ioloop.stop = mock.Mock(return_value=0) + self.amqp_consumer._connection.add_timeout = mock.Mock(return_value=0) + self.amqp_consumer._closing = True + self.assertEqual(None, + self.amqp_consumer.on_connection_closed("", 404, + "Not Found")) + self.amqp_consumer._closing = False + self.assertEqual(None, + self.amqp_consumer.on_connection_closed("", 404, + "Not Found")) + + def test_reconnect(self): + self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer) + self.amqp_consumer._connection.ioloop = mock.Mock() + self.amqp_consumer._connection.ioloop.stop = mock.Mock(return_value=0) + self.amqp_consumer.connect = mock.Mock(return_value=0) + self.amqp_consumer._closing = True + self.assertEqual(None, self.amqp_consumer.reconnect()) + + def test_on_channel_open(self): + self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer) + self.amqp_consumer._connection.add_on_close_callback = \ + mock.Mock(return_value=0) + self.amqp_consumer._channel = mock.Mock() + self.amqp_consumer.add_on_channel_close_callback = mock.Mock() + self.amqp_consumer._channel.exchange_declare = \ + mock.Mock(return_value=0) + self.assertEqual(None, + self.amqp_consumer.on_channel_open( + self.amqp_consumer._channel)) + + def test_add_on_channel_close_callback(self): + self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer) + self.amqp_consumer._connection.add_on_close_callback = \ + mock.Mock(return_value=0) + self.amqp_consumer._channel = mock.Mock() + self.amqp_consumer._channel.add_on_close_callback = mock.Mock() + self.assertEqual(None, + self.amqp_consumer.add_on_channel_close_callback()) + + def test_on_channel_closed(self): + self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer) + self.amqp_consumer._connection.close = mock.Mock(return_value=0) + _channel = mock.Mock() + self.assertEqual(None, + self.amqp_consumer.on_channel_closed(_channel, + "", "")) + + def test_ion_exchange_declareok(self): + self.amqp_consumer.setup_queue = mock.Mock(return_value=0) + self.assertEqual(None, self.amqp_consumer.on_exchange_declareok(10)) + + def test_setup_queue(self): + self.amqp_consumer._channel = mock.Mock() + self.amqp_consumer._channel.add_on_close_callback = mock.Mock() + self.assertEqual(None, self.amqp_consumer.setup_queue("collectd")) + + def test_on_queue_declareok(self): + self.amqp_consumer._channel = mock.Mock() + self.amqp_consumer._channel.queue_bind = mock.Mock() + self.assertEqual(None, self.amqp_consumer.on_queue_declareok(10)) + + def test__on_bindok(self): + self.amqp_consumer._channel = mock.Mock() + self.amqp_consumer._channel.basic_consume = mock.Mock() + self.amqp_consumer.add_on_cancel_callback = mock.Mock() + self.assertEqual(None, self.amqp_consumer._on_bindok(10)) + + def test_add_on_cancel_callback(self): + self.amqp_consumer._channel = mock.Mock() + self.amqp_consumer._channel.add_on_cancel_callback = mock.Mock() + self.assertEqual(None, self.amqp_consumer.add_on_cancel_callback()) + + def test_on_consumer_cancelled(self): + self.amqp_consumer._channel = mock.Mock() + self.amqp_consumer._channel.close = mock.Mock() + self.assertEqual(None, self.amqp_consumer.on_consumer_cancelled(10)) + + def test_on_message(self): + body = "msg {} cpu/cpu-0/ipc 101010:10" + properties = "" + basic_deliver = mock.Mock() + basic_deliver.delivery_tag = mock.Mock(return_value=0) + self.amqp_consumer.ack_message = mock.Mock() + self.assertEqual(None, + self.amqp_consumer.on_message(10, basic_deliver, + properties, body)) + + def test_ack_message(self): + self.amqp_consumer._channel = mock.Mock() + self.amqp_consumer._channel.basic_ack = mock.Mock() + self.assertEqual(None, self.amqp_consumer.ack_message(10)) + + def test_on_cancelok(self): + self.amqp_consumer._channel = mock.Mock() + self.amqp_consumer._channel.close = mock.Mock() + self.assertEqual(None, self.amqp_consumer.on_cancelok(10)) + + def test_run(self): + self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer) + self.amqp_consumer.connect = mock.Mock() + self.amqp_consumer._connection.ioloop.start = mock.Mock() + self.assertEqual(None, self.amqp_consumer.run()) + + def test_stop(self): + self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer) + self.amqp_consumer.connect = mock.Mock() + self.amqp_consumer._connection.ioloop.start = mock.Mock() + self.amqp_consumer._channel = mock.Mock() + self.amqp_consumer._channel.basic_cancel = mock.Mock() + self.assertEqual(None, self.amqp_consumer.stop()) + + def test_close_connection(self): + self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer) + self.amqp_consumer._connection.close = mock.Mock() + self.assertEqual(None, self.amqp_consumer.close_connection()) diff --git a/tests/unit/network_services/nfvi/test_resource.py b/tests/unit/network_services/nfvi/test_resource.py new file mode 100644 index 000000000..90910d89b --- /dev/null +++ b/tests/unit/network_services/nfvi/test_resource.py @@ -0,0 +1,171 @@ +# Copyright (c) 2016-2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +import unittest +import multiprocessing +import mock + +from yardstick.network_services.nfvi.resource import ResourceProfile +from yardstick.network_services.nfvi import resource, collectd + + +class TestResourceProfile(unittest.TestCase): + VNFD = {'vnfd:vnfd-catalog': + {'vnfd': + [{'short-name': 'VpeVnf', + 'vdu': + [{'routing_table': + [{'network': '152.16.100.20', + 'netmask': '255.255.255.0', + 'gateway': '152.16.100.20', + 'if': 'xe0'}, + {'network': '152.16.40.20', + 'netmask': '255.255.255.0', + 'gateway': '152.16.40.20', + 'if': 'xe1'}], + 'description': 'VPE approximation using DPDK', + 'name': 'vpevnf-baremetal', + 'nd_route_tbl': + [{'network': '0064:ff9b:0:0:0:0:9810:6414', + 'netmask': '112', + 'gateway': '0064:ff9b:0:0:0:0:9810:6414', + 'if': 'xe0'}, + {'network': '0064:ff9b:0:0:0:0:9810:2814', + 'netmask': '112', + 'gateway': '0064:ff9b:0:0:0:0:9810:2814', + 'if': 'xe1'}], + 'id': 'vpevnf-baremetal', + 'external-interface': + [{'virtual-interface': + {'dst_mac': '3c:fd:fe:9e:64:38', + 'vpci': '0000:05:00.0', + 'local_ip': '152.16.100.19', + 'type': 'PCI-PASSTHROUGH', + 'netmask': '255.255.255.0', + 'dpdk_port_num': '0', + 'bandwidth': '10 Gbps', + 'dst_ip': '152.16.100.20', + 'local_mac': '3c:fd:fe:a1:2b:80'}, + 'vnfd-connection-point-ref': 'xe0', + 'name': 'xe0'}, + {'virtual-interface': + {'dst_mac': '00:1e:67:d0:60:5c', + 'vpci': '0000:05:00.1', + 'local_ip': '152.16.40.19', + 'type': 'PCI-PASSTHROUGH', + 'netmask': '255.255.255.0', + 'dpdk_port_num': '1', + 'bandwidth': '10 Gbps', + 'dst_ip': '152.16.40.20', + 'local_mac': '3c:fd:fe:a1:2b:81'}, + 'vnfd-connection-point-ref': 'xe1', + 'name': 'xe1'}]}], + 'description': 'Vpe approximation using DPDK', + 'mgmt-interface': + {'vdu-id': 'vpevnf-baremetal', + 'host': '1.1.1.1', + 'password': 'r00t', + 'user': 'root', + 'ip': '1.1.1.1'}, + 'benchmark': + {'kpi': ['packets_in', 'packets_fwd', 'packets_dropped']}, + 'connection-point': [{'type': 'VPORT', 'name': 'xe0'}, + {'type': 'VPORT', 'name': 'xe1'}], + 'id': 'VpeApproxVnf', 'name': 'VPEVnfSsh'}]}} + + def setUp(self): + with mock.patch("yardstick.ssh.SSH") as ssh: + ssh_mock = mock.Mock(autospec=ssh.SSH) + ssh_mock.execute = \ + mock.Mock(return_value=(0, {}, "")) + ssh.return_value = ssh_mock + + self.resource_profile = \ + ResourceProfile(self.VNFD['vnfd:vnfd-catalog']['vnfd'][0], + [1, 2, 3]) + + def test___init__(self): + self.assertEqual(True, self.resource_profile.enable) + + def test_check_if_sa_running(self): + self.assertEqual(self.resource_profile.check_if_sa_running("collectd"), + [True, {}]) + + def test_amqp_collect_nfvi_kpi(self): + _queue = multiprocessing.Queue() + _queue.put({"cpu/cpu-0/ipc": "ipc:10"}) + amqp = self.resource_profile.amqp_collect_nfvi_kpi(_queue) + _queue.put({"/memory/bandwidth": "local:10"}) + amqp = self.resource_profile.amqp_collect_nfvi_kpi(_queue) + expected = {'timestamp': '', 'cpu': {}, 'memory': {'bandwidth': '10'}} + self.assertDictEqual(expected, amqp) + + def test_amqp_collect_nfvi_kpi_exception(self): + amqp = self.resource_profile.amqp_collect_nfvi_kpi({}) + self.assertDictEqual({}, amqp) + + def test_get_cpu_data(self): + reskey = ["", "cpufreq", "cpufreq-0"] + value = "metric:10" + val = self.resource_profile.get_cpu_data(reskey, value) + self.assertEqual(val, ['0', 'cpufreq', '10', 'metric']) + + def test_get_cpu_data_error(self): + reskey = ["", "", ""] + value = "metric:10" + val = self.resource_profile.get_cpu_data(reskey, value) + self.assertEqual(val, ['error', 'Invalid', '']) + + def test__start_collectd(self): + with mock.patch("yardstick.ssh.SSH") as ssh: + ssh_mock = mock.Mock(autospec=ssh.SSH) + ssh_mock.execute = \ + mock.Mock(return_value=(0, "", "")) + ssh.return_value = ssh_mock + resource_profile = \ + ResourceProfile(self.VNFD['vnfd:vnfd-catalog']['vnfd'][0], + [1, 2, 3]) + self.assertIsNone( + resource_profile._start_collectd(ssh_mock, "/opt/nsb_bin")) + + def test_initiate_systemagent(self): + with mock.patch("yardstick.ssh.SSH") as ssh: + ssh_mock = mock.Mock(autospec=ssh.SSH) + ssh_mock.execute = \ + mock.Mock(return_value=(0, "", "")) + ssh.return_value = ssh_mock + resource_profile = \ + ResourceProfile(self.VNFD['vnfd:vnfd-catalog']['vnfd'][0], + [1, 2, 3]) + self.assertIsNone( + resource_profile.initiate_systemagent("/opt/nsb_bin")) + + def test_parse_collectd_result(self): + res = self.resource_profile.parse_collectd_result({}, [0, 1, 2]) + self.assertDictEqual(res, {'timestamp': '', 'cpu': {}, 'memory': {}}) + + def test_run_collectd_amqp(self): + _queue = multiprocessing.Queue() + resource.AmqpConsumer = mock.Mock(autospec=collectd) + self.assertIsNone(self.resource_profile.run_collectd_amqp(_queue)) + + def test_start(self): + self.assertIsNone(self.resource_profile.start()) + + def test_stop(self): + self.assertIsNone(self.resource_profile.stop()) + +if __name__ == '__main__': + unittest.main() diff --git a/yardstick/network_services/nfvi/__init__.py b/yardstick/network_services/nfvi/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/yardstick/network_services/nfvi/__init__.py diff --git a/yardstick/network_services/nfvi/collectd.conf b/yardstick/network_services/nfvi/collectd.conf new file mode 100644 index 000000000..abcf24ded --- /dev/null +++ b/yardstick/network_services/nfvi/collectd.conf @@ -0,0 +1,80 @@ +# Config file for collectd(1). +# +# Some plugins need additional configuration and are disabled by default. +# Please read collectd.conf(5) for details. +# +# You should also read /usr/share/doc/collectd-core/README.Debian.plugins +# before enabling any more plugins. + +############################################################################## +# Global # +#----------------------------------------------------------------------------# +# Global settings for the daemon. # +############################################################################## + +Hostname "nsb_stats" +FQDNLookup true + +Interval 5 + +############################################################################## +# LoadPlugin section # +#----------------------------------------------------------------------------# +# Specify what features to activate. # +############################################################################## + +LoadPlugin amqp +LoadPlugin cpu +LoadPlugin intel_rdt +LoadPlugin memory + +############################################################################## +# Plugin configuration # +#----------------------------------------------------------------------------# +# In this section configuration stubs for each plugin are provided. A desc- # +# ription of those options is available in the collectd.conf(5) manual page. # +############################################################################## + +<Plugin amqp> + <Publish "name"> + Host "0.0.0.0" + Port "5672" + VHost "/" + User "admin" + Password "admin" + Exchange "amq.fanout" + RoutingKey "collectd" + Persistent false + StoreRates false + ConnectionRetryDelay 0 + </Publish> +</Plugin> + +<Plugin cpu> + ReportByCpu true + ReportByState true + ValuesPercentage false +</Plugin> + +<Plugin memory> + ValuesAbsolute true + ValuesPercentage false +</Plugin> + +<LoadPlugin intel_rdt> + Interval 5 +</LoadPlugin> +<Plugin "intel_rdt"> + Cores "" +</Plugin> + +<Plugin memcached> + <Instance "local"> + Host "127.0.0.1" + Port "11211" + </Instance> +</Plugin> + +<Include "/etc/collectd/collectd.conf.d"> + Filter "*.conf" +</Include> diff --git a/yardstick/network_services/nfvi/collectd.py b/yardstick/network_services/nfvi/collectd.py new file mode 100644 index 000000000..ea80e4ff8 --- /dev/null +++ b/yardstick/network_services/nfvi/collectd.py @@ -0,0 +1,158 @@ +# Copyright (c) 2016-2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" AMQP Consumer senario definition """ + +from __future__ import absolute_import +from __future__ import print_function +import logging +import pika +from pika.exceptions import AMQPConnectionError + + +class AmqpConsumer(object): + """ This Class handles amqp consumer and collects collectd data """ + EXCHANGE = 'amq.fanout' + EXCHANGE_TYPE = 'fanout' + QUEUE = '' + ROUTING_KEY = 'collectd' + + def __init__(self, amqp_url, queue): + self._connection = None + self._channel = None + self._closing = False + self._consumer_tag = None + self._url = amqp_url + self._queue = queue + + def connect(self): + """ connect to amqp url """ + try: + return pika.SelectConnection(pika.URLParameters(self._url), + self.on_connection_open, + stop_ioloop_on_close=False) + except AMQPConnectionError: + raise RuntimeError + + def on_connection_open(self, unused_connection): + """ call back from pika & open channel """ + logging.info("list of unused connections %s", unused_connection) + self._connection.add_on_close_callback(self.on_connection_closed) + self._connection.channel(on_open_callback=self.on_channel_open) + + def on_connection_closed(self, connection, reply_code, reply_text): + """ close the amqp connections. if force close, try re-connect """ + logging.info("amqp connection (%s)", connection) + self._channel = None + if self._closing: + self._connection.ioloop.stop() + else: + logging.debug(('Connection closed, reopening in 5 sec: (%s) %s', + reply_code, reply_text)) + self._connection.add_timeout(5, self.reconnect) + + def reconnect(self): + """ re-connect amqp consumer""" + self._connection.ioloop.stop() + + if not self._closing: + self._connection = self.connect() + self._connection.ioloop.start() + + def on_channel_open(self, channel): + """ add close callback & setup exchange """ + self._channel = channel + self.add_on_channel_close_callback() + self._channel.exchange_declare(self.on_exchange_declareok, + self.EXCHANGE, + self.EXCHANGE_TYPE, + durable=True, auto_delete=False) + + def add_on_channel_close_callback(self): + """ register for close callback """ + self._channel.add_on_close_callback(self.on_channel_closed) + + def on_channel_closed(self, channel, reply_code, reply_text): + """ close amqp channel connection """ + logging.info("amqp channel closed channel(%s), " + "reply_code(%s) reply_text(%s)", + channel, reply_code, reply_text) + self._connection.close() + + def on_exchange_declareok(self, unused_frame): + """ if exchange declare is ok, setup queue """ + logging.info("amqp exchange unused frame (%s)", unused_frame) + self.setup_queue(self.QUEUE) + + def setup_queue(self, queue_name): + """ setup queue & declare same with channel """ + logging.info("amqp queue name (%s)", queue_name) + self._channel.queue_declare(self.on_queue_declareok, queue_name) + + def on_queue_declareok(self, method_frame): + """ bind queue to channel """ + logging.info("amqp queue method frame (%s)", method_frame) + self._channel.queue_bind(self._on_bindok, self.QUEUE, + self.EXCHANGE, self.ROUTING_KEY) + + def _on_bindok(self, unused_frame): + """ call back on bind start consuming data from amqp queue """ + logging.info("amqp unused frame %s", unused_frame) + self.add_on_cancel_callback() + self._consumer_tag = self._channel.basic_consume(self.on_message, + self.QUEUE) + + def add_on_cancel_callback(self): + """ add cancel func to amqp callback """ + self._channel.add_on_cancel_callback(self.on_consumer_cancelled) + + def on_consumer_cancelled(self, method_frame): + """ on cancel close the channel """ + logging.info("amqp method frame %s", method_frame) + if self._channel: + self._channel.close() + + def on_message(self, unused_channel, basic_deliver, properties, body): + """ parse received data from amqp server (collectd) """ + logging.info("amqp unused channel %s, properties %s", + unused_channel, properties) + metrics = body.rsplit() + self._queue.put({metrics[1]: metrics[3]}) + self.ack_message(basic_deliver.delivery_tag) + + def ack_message(self, delivery_tag): + """ acknowledge amqp msg """ + self._channel.basic_ack(delivery_tag) + + def on_cancelok(self, unused_frame): + """ initiate amqp close channel on callback """ + logging.info("amqp unused frame %s", unused_frame) + self._channel.close() + + def run(self): + """ Initiate amqp connection. """ + self._connection = self.connect() + self._connection.ioloop.start() + + def stop(self): + """ stop amqp consuming data """ + self._closing = True + if self._channel: + self._channel.basic_cancel(self.on_cancelok, self._consumer_tag) + + if self._connection: + self._connection.ioloop.start() + + def close_connection(self): + """ close amqp connection """ + self._connection.close() diff --git a/yardstick/network_services/nfvi/collectd.sh b/yardstick/network_services/nfvi/collectd.sh new file mode 100755 index 000000000..7acb40431 --- /dev/null +++ b/yardstick/network_services/nfvi/collectd.sh @@ -0,0 +1,89 @@ +#!/bin/bash +# +# Copyright (c) 2016-2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +INSTALL_NSB_BIN="/opt/nsb_bin" +cd $INSTALL_NSB_BIN + +if [ "$(whoami)" != "root" ]; then + echo "Must be root to run $0" + exit 1; +fi + +echo "Install required libraries to run collectd..." +pkg=(git flex bison build-essential pkg-config automake autotools-dev libltdl-dev librabbitmq-dev rabbitmq-server) +for i in "${pkg[@]}"; do +dpkg-query -W --showformat='${Status}\n' "${i}"|grep "install ok installed" + if [ "$?" -eq "1" ]; then + apt-get -y install "${i}"; + fi +done +echo "Done" + +ldconfig -p | grep libpqos >/dev/null +if [ $? -eq 0 ] +then + echo "Intel RDT library already installed. Done" +else + pushd . + + echo "Get intel_rdt repo and install..." + rm -rf intel-cmt-cat >/dev/null + git clone https://github.com/01org/intel-cmt-cat.git + pushd intel-cmt-cat + git checkout tags/v1.5 -b v1.5 + make install PREFIX=/usr + popd + + popd + echo "Done." +fi + +which /opt/nsb_bin/collectd/collectd >/dev/null +if [ $? -eq 0 ] +then + echo "Collectd already installed. Done" +else + pushd . + echo "Get collectd from repository and install..." + rm -rf collectd >/dev/null + git clone https://github.com/collectd/collectd.git + pushd collectd + git stash + git checkout -b collectd 43a4db3b3209f497a0ba408aebf8aee385c6262d + ./build.sh + ./configure --with-libpqos=/usr/ + make install > /dev/null + popd + echo "Done." + popd +fi + +modprobe msr +cp $INSTALL_NSB_BIN/collectd.conf /opt/collectd/etc/ + +echo "Check if admin user already created" +rabbitmqctl list_users | grep '^admin$' > /dev/null +if [ $? -eq 0 ]; +then + echo "'admin' user already created..." +else + echo "Creating 'admin' user for collectd data export..." + rabbitmqctl delete_user guest + rabbitmqctl add_user admin admin + rabbitmqctl authenticate_user admin admin + rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" + echo "Done" +fi diff --git a/yardstick/network_services/nfvi/resource.py b/yardstick/network_services/nfvi/resource.py new file mode 100644 index 000000000..d71e1e995 --- /dev/null +++ b/yardstick/network_services/nfvi/resource.py @@ -0,0 +1,162 @@ +# Copyright (c) 2016-2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" Resource collection definitions """ + +from __future__ import absolute_import +import logging +import os.path +import re +import multiprocessing +from oslo_config import cfg + +from yardstick import ssh +from yardstick.network_services.nfvi.collectd import AmqpConsumer +from yardstick.network_services.utils import provision_tool + +CONF = cfg.CONF +ZMQ_OVS_PORT = 5567 +ZMQ_POLLING_TIME = 12000 + + +class ResourceProfile(object): + """ + This profile adds a resource at the beginning of the test session + """ + + def __init__(self, vnfd, cores): + self.enable = True + self.connection = None + self.cores = cores + + mgmt_interface = vnfd.get("mgmt-interface") + user = mgmt_interface.get("user") + passwd = mgmt_interface.get("password") + ip_addr = mgmt_interface.get("ip") + self.vnfip = mgmt_interface.get("host", ip_addr) + ssh_port = mgmt_interface.get("ssh_port", ssh.DEFAULT_PORT) + self.connection = ssh.SSH(user, self.vnfip, + password=passwd, port=ssh_port) + self.connection.wait() + + def check_if_sa_running(self, process): + """ verify if system agent is running """ + err, pid, _ = self.connection.execute("pgrep -f %s" % process) + return [err == 0, pid] + + def run_collectd_amqp(self, queue): + """ run amqp consumer to collect the NFVi data """ + amqp = \ + AmqpConsumer('amqp://admin:admin@{}:5672/%2F'.format(self.vnfip), + queue) + try: + amqp.run() + except (AttributeError, RuntimeError, KeyboardInterrupt): + amqp.stop() + + @classmethod + def get_cpu_data(cls, reskey, value): + """ Get cpu topology of the host """ + pattern = r"-(\d+)" + if "cpufreq" in reskey[1]: + match = re.search(pattern, reskey[2], re.MULTILINE) + metric = reskey[1] + else: + match = re.search(pattern, reskey[1], re.MULTILINE) + metric = reskey[2] + + time, val = re.split(":", value) + if match: + return [str(match.group(1)), metric, val, time] + + return ["error", "Invalid", ""] + + def parse_collectd_result(self, metrics, listcores): + """ convert collectd data into json""" + res = {"cpu": {}, "memory": {}} + testcase = "" + + for key, value in metrics.items(): + reskey = key.rsplit("/") + if "cpu" in reskey[1] or "intel_rdt" in reskey[1]: + cpu_key, name, metric, testcase = \ + self.get_cpu_data(reskey, value) + if cpu_key in listcores: + res["cpu"].setdefault(cpu_key, {}).update({name: metric}) + elif "memory" in reskey[1]: + val = re.split(":", value)[1] + res["memory"].update({reskey[2]: val}) + res["timestamp"] = testcase + + return res + + def amqp_collect_nfvi_kpi(self, _queue=multiprocessing.Queue()): + """ amqp collect and return nfvi kpis """ + try: + metric = {} + amqp_client = \ + multiprocessing.Process(target=self.run_collectd_amqp, + args=(_queue,)) + amqp_client.start() + amqp_client.join(7) + amqp_client.terminate() + + while not _queue.empty(): + metric.update(_queue.get()) + except (AttributeError, RuntimeError, TypeError, ValueError): + logging.debug("Failed to get NFVi stats...") + msg = {} + else: + msg = self.parse_collectd_result(metric, self.cores) + + return msg + + @classmethod + def _start_collectd(cls, connection, bin_path): + connection.execute('pkill -9 collectd') + collectd = os.path.join(bin_path, "collectd.sh") + provision_tool(connection, collectd) + provision_tool(connection, os.path.join(bin_path, "collectd.conf")) + + # Reset amqp queue + connection.execute("sudo service rabbitmq-server start") + connection.execute("sudo rabbitmqctl stop_app") + connection.execute("sudo rabbitmqctl reset") + connection.execute("sudo rabbitmqctl start_app") + connection.execute("sudo service rabbitmq-server restart") + + # Run collectd + connection.execute(collectd) + connection.execute(os.path.join(bin_path, "collectd", "collectd")) + + def initiate_systemagent(self, bin_path): + """ Start system agent for NFVi collection on host """ + if self.enable: + self._start_collectd(self.connection, bin_path) + + def start(self): + """ start nfvi collection """ + if self.enable: + logging.debug("Start NVFi metric collection...") + + def stop(self): + """ stop nfvi collection """ + if self.enable: + agent = "collectd" + logging.debug("Stop resource monitor...") + status, pid = self.check_if_sa_running(agent) + if status: + self.connection.execute('kill -9 %s' % pid) + self.connection.execute('pkill -9 %s' % agent) + self.connection.execute('service rabbitmq-server stop') + self.connection.execute("sudo rabbitmqctl stop_app") |