aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDeepak S <deepak.s@linux.intel.com>2016-12-30 09:22:25 -0800
committerDeepak S <deepak.s@linux.intel.com>2017-01-19 08:28:10 +0530
commitddb76faa5841997bd3eec4ed2f3d33f56e66d0c3 (patch)
tree2970946f924aedcc318158d19e24a3252cdf0cd5
parent4c02cf5d19e2c36c9747a1c05d86331a1918baec (diff)
Add infrastructure to add the NFVi KPI collections
This patches added common function to collect NFVi KPIs for given usecases - Core KPIs like memory/LLC/IPC etc - OVS stats - memory stats etc. JIRA: YARDSTICK-488 Change-Id: Iab41146392efc47b7313b1846a67728a44d0f1d6 Signed-off-by: Deepak S <deepak.s@linux.intel.com>
-rw-r--r--tests/unit/network_services/nfvi/__init__.py0
-rw-r--r--tests/unit/network_services/nfvi/test_collectd.py160
-rw-r--r--tests/unit/network_services/nfvi/test_resource.py171
-rw-r--r--yardstick/network_services/nfvi/__init__.py0
-rw-r--r--yardstick/network_services/nfvi/collectd.conf80
-rw-r--r--yardstick/network_services/nfvi/collectd.py158
-rwxr-xr-xyardstick/network_services/nfvi/collectd.sh89
-rw-r--r--yardstick/network_services/nfvi/resource.py162
8 files changed, 820 insertions, 0 deletions
diff --git a/tests/unit/network_services/nfvi/__init__.py b/tests/unit/network_services/nfvi/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/tests/unit/network_services/nfvi/__init__.py
diff --git a/tests/unit/network_services/nfvi/test_collectd.py b/tests/unit/network_services/nfvi/test_collectd.py
new file mode 100644
index 000000000..5bd7196df
--- /dev/null
+++ b/tests/unit/network_services/nfvi/test_collectd.py
@@ -0,0 +1,160 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import multiprocessing
+import mock
+
+from yardstick.network_services.nfvi.collectd import AmqpConsumer
+
+
+class TestAmqpConsumer(unittest.TestCase):
+ def setUp(self):
+ self.queue = multiprocessing.Queue()
+ self.url = 'amqp://admin:admin@1.1.1.1:5672/%2F'
+ self.amqp_consumer = AmqpConsumer(self.url, self.queue)
+
+ def test___init__(self):
+ self.assertEqual(self.url, self.amqp_consumer._url)
+
+ def test_connect(self):
+ self.assertRaises(RuntimeError, self.amqp_consumer.connect)
+
+ def test_on_connection_open(self):
+ self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+ self.amqp_consumer._connection.add_on_close_callback = \
+ mock.Mock(return_value=0)
+ self.amqp_consumer._connection.channel = mock.Mock(return_value=0)
+ self.assertEqual(None, self.amqp_consumer.on_connection_open(10))
+
+ def test_on_connection_closed(self):
+ self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+ self.amqp_consumer._connection.ioloop = mock.Mock()
+ self.amqp_consumer._connection.ioloop.stop = mock.Mock(return_value=0)
+ self.amqp_consumer._connection.add_timeout = mock.Mock(return_value=0)
+ self.amqp_consumer._closing = True
+ self.assertEqual(None,
+ self.amqp_consumer.on_connection_closed("", 404,
+ "Not Found"))
+ self.amqp_consumer._closing = False
+ self.assertEqual(None,
+ self.amqp_consumer.on_connection_closed("", 404,
+ "Not Found"))
+
+ def test_reconnect(self):
+ self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+ self.amqp_consumer._connection.ioloop = mock.Mock()
+ self.amqp_consumer._connection.ioloop.stop = mock.Mock(return_value=0)
+ self.amqp_consumer.connect = mock.Mock(return_value=0)
+ self.amqp_consumer._closing = True
+ self.assertEqual(None, self.amqp_consumer.reconnect())
+
+ def test_on_channel_open(self):
+ self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+ self.amqp_consumer._connection.add_on_close_callback = \
+ mock.Mock(return_value=0)
+ self.amqp_consumer._channel = mock.Mock()
+ self.amqp_consumer.add_on_channel_close_callback = mock.Mock()
+ self.amqp_consumer._channel.exchange_declare = \
+ mock.Mock(return_value=0)
+ self.assertEqual(None,
+ self.amqp_consumer.on_channel_open(
+ self.amqp_consumer._channel))
+
+ def test_add_on_channel_close_callback(self):
+ self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+ self.amqp_consumer._connection.add_on_close_callback = \
+ mock.Mock(return_value=0)
+ self.amqp_consumer._channel = mock.Mock()
+ self.amqp_consumer._channel.add_on_close_callback = mock.Mock()
+ self.assertEqual(None,
+ self.amqp_consumer.add_on_channel_close_callback())
+
+ def test_on_channel_closed(self):
+ self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+ self.amqp_consumer._connection.close = mock.Mock(return_value=0)
+ _channel = mock.Mock()
+ self.assertEqual(None,
+ self.amqp_consumer.on_channel_closed(_channel,
+ "", ""))
+
+ def test_ion_exchange_declareok(self):
+ self.amqp_consumer.setup_queue = mock.Mock(return_value=0)
+ self.assertEqual(None, self.amqp_consumer.on_exchange_declareok(10))
+
+ def test_setup_queue(self):
+ self.amqp_consumer._channel = mock.Mock()
+ self.amqp_consumer._channel.add_on_close_callback = mock.Mock()
+ self.assertEqual(None, self.amqp_consumer.setup_queue("collectd"))
+
+ def test_on_queue_declareok(self):
+ self.amqp_consumer._channel = mock.Mock()
+ self.amqp_consumer._channel.queue_bind = mock.Mock()
+ self.assertEqual(None, self.amqp_consumer.on_queue_declareok(10))
+
+ def test__on_bindok(self):
+ self.amqp_consumer._channel = mock.Mock()
+ self.amqp_consumer._channel.basic_consume = mock.Mock()
+ self.amqp_consumer.add_on_cancel_callback = mock.Mock()
+ self.assertEqual(None, self.amqp_consumer._on_bindok(10))
+
+ def test_add_on_cancel_callback(self):
+ self.amqp_consumer._channel = mock.Mock()
+ self.amqp_consumer._channel.add_on_cancel_callback = mock.Mock()
+ self.assertEqual(None, self.amqp_consumer.add_on_cancel_callback())
+
+ def test_on_consumer_cancelled(self):
+ self.amqp_consumer._channel = mock.Mock()
+ self.amqp_consumer._channel.close = mock.Mock()
+ self.assertEqual(None, self.amqp_consumer.on_consumer_cancelled(10))
+
+ def test_on_message(self):
+ body = "msg {} cpu/cpu-0/ipc 101010:10"
+ properties = ""
+ basic_deliver = mock.Mock()
+ basic_deliver.delivery_tag = mock.Mock(return_value=0)
+ self.amqp_consumer.ack_message = mock.Mock()
+ self.assertEqual(None,
+ self.amqp_consumer.on_message(10, basic_deliver,
+ properties, body))
+
+ def test_ack_message(self):
+ self.amqp_consumer._channel = mock.Mock()
+ self.amqp_consumer._channel.basic_ack = mock.Mock()
+ self.assertEqual(None, self.amqp_consumer.ack_message(10))
+
+ def test_on_cancelok(self):
+ self.amqp_consumer._channel = mock.Mock()
+ self.amqp_consumer._channel.close = mock.Mock()
+ self.assertEqual(None, self.amqp_consumer.on_cancelok(10))
+
+ def test_run(self):
+ self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+ self.amqp_consumer.connect = mock.Mock()
+ self.amqp_consumer._connection.ioloop.start = mock.Mock()
+ self.assertEqual(None, self.amqp_consumer.run())
+
+ def test_stop(self):
+ self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+ self.amqp_consumer.connect = mock.Mock()
+ self.amqp_consumer._connection.ioloop.start = mock.Mock()
+ self.amqp_consumer._channel = mock.Mock()
+ self.amqp_consumer._channel.basic_cancel = mock.Mock()
+ self.assertEqual(None, self.amqp_consumer.stop())
+
+ def test_close_connection(self):
+ self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+ self.amqp_consumer._connection.close = mock.Mock()
+ self.assertEqual(None, self.amqp_consumer.close_connection())
diff --git a/tests/unit/network_services/nfvi/test_resource.py b/tests/unit/network_services/nfvi/test_resource.py
new file mode 100644
index 000000000..90910d89b
--- /dev/null
+++ b/tests/unit/network_services/nfvi/test_resource.py
@@ -0,0 +1,171 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import multiprocessing
+import mock
+
+from yardstick.network_services.nfvi.resource import ResourceProfile
+from yardstick.network_services.nfvi import resource, collectd
+
+
+class TestResourceProfile(unittest.TestCase):
+ VNFD = {'vnfd:vnfd-catalog':
+ {'vnfd':
+ [{'short-name': 'VpeVnf',
+ 'vdu':
+ [{'routing_table':
+ [{'network': '152.16.100.20',
+ 'netmask': '255.255.255.0',
+ 'gateway': '152.16.100.20',
+ 'if': 'xe0'},
+ {'network': '152.16.40.20',
+ 'netmask': '255.255.255.0',
+ 'gateway': '152.16.40.20',
+ 'if': 'xe1'}],
+ 'description': 'VPE approximation using DPDK',
+ 'name': 'vpevnf-baremetal',
+ 'nd_route_tbl':
+ [{'network': '0064:ff9b:0:0:0:0:9810:6414',
+ 'netmask': '112',
+ 'gateway': '0064:ff9b:0:0:0:0:9810:6414',
+ 'if': 'xe0'},
+ {'network': '0064:ff9b:0:0:0:0:9810:2814',
+ 'netmask': '112',
+ 'gateway': '0064:ff9b:0:0:0:0:9810:2814',
+ 'if': 'xe1'}],
+ 'id': 'vpevnf-baremetal',
+ 'external-interface':
+ [{'virtual-interface':
+ {'dst_mac': '3c:fd:fe:9e:64:38',
+ 'vpci': '0000:05:00.0',
+ 'local_ip': '152.16.100.19',
+ 'type': 'PCI-PASSTHROUGH',
+ 'netmask': '255.255.255.0',
+ 'dpdk_port_num': '0',
+ 'bandwidth': '10 Gbps',
+ 'dst_ip': '152.16.100.20',
+ 'local_mac': '3c:fd:fe:a1:2b:80'},
+ 'vnfd-connection-point-ref': 'xe0',
+ 'name': 'xe0'},
+ {'virtual-interface':
+ {'dst_mac': '00:1e:67:d0:60:5c',
+ 'vpci': '0000:05:00.1',
+ 'local_ip': '152.16.40.19',
+ 'type': 'PCI-PASSTHROUGH',
+ 'netmask': '255.255.255.0',
+ 'dpdk_port_num': '1',
+ 'bandwidth': '10 Gbps',
+ 'dst_ip': '152.16.40.20',
+ 'local_mac': '3c:fd:fe:a1:2b:81'},
+ 'vnfd-connection-point-ref': 'xe1',
+ 'name': 'xe1'}]}],
+ 'description': 'Vpe approximation using DPDK',
+ 'mgmt-interface':
+ {'vdu-id': 'vpevnf-baremetal',
+ 'host': '1.1.1.1',
+ 'password': 'r00t',
+ 'user': 'root',
+ 'ip': '1.1.1.1'},
+ 'benchmark':
+ {'kpi': ['packets_in', 'packets_fwd', 'packets_dropped']},
+ 'connection-point': [{'type': 'VPORT', 'name': 'xe0'},
+ {'type': 'VPORT', 'name': 'xe1'}],
+ 'id': 'VpeApproxVnf', 'name': 'VPEVnfSsh'}]}}
+
+ def setUp(self):
+ with mock.patch("yardstick.ssh.SSH") as ssh:
+ ssh_mock = mock.Mock(autospec=ssh.SSH)
+ ssh_mock.execute = \
+ mock.Mock(return_value=(0, {}, ""))
+ ssh.return_value = ssh_mock
+
+ self.resource_profile = \
+ ResourceProfile(self.VNFD['vnfd:vnfd-catalog']['vnfd'][0],
+ [1, 2, 3])
+
+ def test___init__(self):
+ self.assertEqual(True, self.resource_profile.enable)
+
+ def test_check_if_sa_running(self):
+ self.assertEqual(self.resource_profile.check_if_sa_running("collectd"),
+ [True, {}])
+
+ def test_amqp_collect_nfvi_kpi(self):
+ _queue = multiprocessing.Queue()
+ _queue.put({"cpu/cpu-0/ipc": "ipc:10"})
+ amqp = self.resource_profile.amqp_collect_nfvi_kpi(_queue)
+ _queue.put({"/memory/bandwidth": "local:10"})
+ amqp = self.resource_profile.amqp_collect_nfvi_kpi(_queue)
+ expected = {'timestamp': '', 'cpu': {}, 'memory': {'bandwidth': '10'}}
+ self.assertDictEqual(expected, amqp)
+
+ def test_amqp_collect_nfvi_kpi_exception(self):
+ amqp = self.resource_profile.amqp_collect_nfvi_kpi({})
+ self.assertDictEqual({}, amqp)
+
+ def test_get_cpu_data(self):
+ reskey = ["", "cpufreq", "cpufreq-0"]
+ value = "metric:10"
+ val = self.resource_profile.get_cpu_data(reskey, value)
+ self.assertEqual(val, ['0', 'cpufreq', '10', 'metric'])
+
+ def test_get_cpu_data_error(self):
+ reskey = ["", "", ""]
+ value = "metric:10"
+ val = self.resource_profile.get_cpu_data(reskey, value)
+ self.assertEqual(val, ['error', 'Invalid', ''])
+
+ def test__start_collectd(self):
+ with mock.patch("yardstick.ssh.SSH") as ssh:
+ ssh_mock = mock.Mock(autospec=ssh.SSH)
+ ssh_mock.execute = \
+ mock.Mock(return_value=(0, "", ""))
+ ssh.return_value = ssh_mock
+ resource_profile = \
+ ResourceProfile(self.VNFD['vnfd:vnfd-catalog']['vnfd'][0],
+ [1, 2, 3])
+ self.assertIsNone(
+ resource_profile._start_collectd(ssh_mock, "/opt/nsb_bin"))
+
+ def test_initiate_systemagent(self):
+ with mock.patch("yardstick.ssh.SSH") as ssh:
+ ssh_mock = mock.Mock(autospec=ssh.SSH)
+ ssh_mock.execute = \
+ mock.Mock(return_value=(0, "", ""))
+ ssh.return_value = ssh_mock
+ resource_profile = \
+ ResourceProfile(self.VNFD['vnfd:vnfd-catalog']['vnfd'][0],
+ [1, 2, 3])
+ self.assertIsNone(
+ resource_profile.initiate_systemagent("/opt/nsb_bin"))
+
+ def test_parse_collectd_result(self):
+ res = self.resource_profile.parse_collectd_result({}, [0, 1, 2])
+ self.assertDictEqual(res, {'timestamp': '', 'cpu': {}, 'memory': {}})
+
+ def test_run_collectd_amqp(self):
+ _queue = multiprocessing.Queue()
+ resource.AmqpConsumer = mock.Mock(autospec=collectd)
+ self.assertIsNone(self.resource_profile.run_collectd_amqp(_queue))
+
+ def test_start(self):
+ self.assertIsNone(self.resource_profile.start())
+
+ def test_stop(self):
+ self.assertIsNone(self.resource_profile.stop())
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/yardstick/network_services/nfvi/__init__.py b/yardstick/network_services/nfvi/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/yardstick/network_services/nfvi/__init__.py
diff --git a/yardstick/network_services/nfvi/collectd.conf b/yardstick/network_services/nfvi/collectd.conf
new file mode 100644
index 000000000..abcf24ded
--- /dev/null
+++ b/yardstick/network_services/nfvi/collectd.conf
@@ -0,0 +1,80 @@
+# Config file for collectd(1).
+#
+# Some plugins need additional configuration and are disabled by default.
+# Please read collectd.conf(5) for details.
+#
+# You should also read /usr/share/doc/collectd-core/README.Debian.plugins
+# before enabling any more plugins.
+
+##############################################################################
+# Global #
+#----------------------------------------------------------------------------#
+# Global settings for the daemon. #
+##############################################################################
+
+Hostname "nsb_stats"
+FQDNLookup true
+
+Interval 5
+
+##############################################################################
+# LoadPlugin section #
+#----------------------------------------------------------------------------#
+# Specify what features to activate. #
+##############################################################################
+
+LoadPlugin amqp
+LoadPlugin cpu
+LoadPlugin intel_rdt
+LoadPlugin memory
+
+##############################################################################
+# Plugin configuration #
+#----------------------------------------------------------------------------#
+# In this section configuration stubs for each plugin are provided. A desc- #
+# ription of those options is available in the collectd.conf(5) manual page. #
+##############################################################################
+
+<Plugin amqp>
+ <Publish "name">
+ Host "0.0.0.0"
+ Port "5672"
+ VHost "/"
+ User "admin"
+ Password "admin"
+ Exchange "amq.fanout"
+ RoutingKey "collectd"
+ Persistent false
+ StoreRates false
+ ConnectionRetryDelay 0
+ </Publish>
+</Plugin>
+
+<Plugin cpu>
+ ReportByCpu true
+ ReportByState true
+ ValuesPercentage false
+</Plugin>
+
+<Plugin memory>
+ ValuesAbsolute true
+ ValuesPercentage false
+</Plugin>
+
+<LoadPlugin intel_rdt>
+ Interval 5
+</LoadPlugin>
+<Plugin "intel_rdt">
+ Cores ""
+</Plugin>
+
+<Plugin memcached>
+ <Instance "local">
+ Host "127.0.0.1"
+ Port "11211"
+ </Instance>
+</Plugin>
+
+<Include "/etc/collectd/collectd.conf.d">
+ Filter "*.conf"
+</Include>
diff --git a/yardstick/network_services/nfvi/collectd.py b/yardstick/network_services/nfvi/collectd.py
new file mode 100644
index 000000000..ea80e4ff8
--- /dev/null
+++ b/yardstick/network_services/nfvi/collectd.py
@@ -0,0 +1,158 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" AMQP Consumer senario definition """
+
+from __future__ import absolute_import
+from __future__ import print_function
+import logging
+import pika
+from pika.exceptions import AMQPConnectionError
+
+
+class AmqpConsumer(object):
+ """ This Class handles amqp consumer and collects collectd data """
+ EXCHANGE = 'amq.fanout'
+ EXCHANGE_TYPE = 'fanout'
+ QUEUE = ''
+ ROUTING_KEY = 'collectd'
+
+ def __init__(self, amqp_url, queue):
+ self._connection = None
+ self._channel = None
+ self._closing = False
+ self._consumer_tag = None
+ self._url = amqp_url
+ self._queue = queue
+
+ def connect(self):
+ """ connect to amqp url """
+ try:
+ return pika.SelectConnection(pika.URLParameters(self._url),
+ self.on_connection_open,
+ stop_ioloop_on_close=False)
+ except AMQPConnectionError:
+ raise RuntimeError
+
+ def on_connection_open(self, unused_connection):
+ """ call back from pika & open channel """
+ logging.info("list of unused connections %s", unused_connection)
+ self._connection.add_on_close_callback(self.on_connection_closed)
+ self._connection.channel(on_open_callback=self.on_channel_open)
+
+ def on_connection_closed(self, connection, reply_code, reply_text):
+ """ close the amqp connections. if force close, try re-connect """
+ logging.info("amqp connection (%s)", connection)
+ self._channel = None
+ if self._closing:
+ self._connection.ioloop.stop()
+ else:
+ logging.debug(('Connection closed, reopening in 5 sec: (%s) %s',
+ reply_code, reply_text))
+ self._connection.add_timeout(5, self.reconnect)
+
+ def reconnect(self):
+ """ re-connect amqp consumer"""
+ self._connection.ioloop.stop()
+
+ if not self._closing:
+ self._connection = self.connect()
+ self._connection.ioloop.start()
+
+ def on_channel_open(self, channel):
+ """ add close callback & setup exchange """
+ self._channel = channel
+ self.add_on_channel_close_callback()
+ self._channel.exchange_declare(self.on_exchange_declareok,
+ self.EXCHANGE,
+ self.EXCHANGE_TYPE,
+ durable=True, auto_delete=False)
+
+ def add_on_channel_close_callback(self):
+ """ register for close callback """
+ self._channel.add_on_close_callback(self.on_channel_closed)
+
+ def on_channel_closed(self, channel, reply_code, reply_text):
+ """ close amqp channel connection """
+ logging.info("amqp channel closed channel(%s), "
+ "reply_code(%s) reply_text(%s)",
+ channel, reply_code, reply_text)
+ self._connection.close()
+
+ def on_exchange_declareok(self, unused_frame):
+ """ if exchange declare is ok, setup queue """
+ logging.info("amqp exchange unused frame (%s)", unused_frame)
+ self.setup_queue(self.QUEUE)
+
+ def setup_queue(self, queue_name):
+ """ setup queue & declare same with channel """
+ logging.info("amqp queue name (%s)", queue_name)
+ self._channel.queue_declare(self.on_queue_declareok, queue_name)
+
+ def on_queue_declareok(self, method_frame):
+ """ bind queue to channel """
+ logging.info("amqp queue method frame (%s)", method_frame)
+ self._channel.queue_bind(self._on_bindok, self.QUEUE,
+ self.EXCHANGE, self.ROUTING_KEY)
+
+ def _on_bindok(self, unused_frame):
+ """ call back on bind start consuming data from amqp queue """
+ logging.info("amqp unused frame %s", unused_frame)
+ self.add_on_cancel_callback()
+ self._consumer_tag = self._channel.basic_consume(self.on_message,
+ self.QUEUE)
+
+ def add_on_cancel_callback(self):
+ """ add cancel func to amqp callback """
+ self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
+
+ def on_consumer_cancelled(self, method_frame):
+ """ on cancel close the channel """
+ logging.info("amqp method frame %s", method_frame)
+ if self._channel:
+ self._channel.close()
+
+ def on_message(self, unused_channel, basic_deliver, properties, body):
+ """ parse received data from amqp server (collectd) """
+ logging.info("amqp unused channel %s, properties %s",
+ unused_channel, properties)
+ metrics = body.rsplit()
+ self._queue.put({metrics[1]: metrics[3]})
+ self.ack_message(basic_deliver.delivery_tag)
+
+ def ack_message(self, delivery_tag):
+ """ acknowledge amqp msg """
+ self._channel.basic_ack(delivery_tag)
+
+ def on_cancelok(self, unused_frame):
+ """ initiate amqp close channel on callback """
+ logging.info("amqp unused frame %s", unused_frame)
+ self._channel.close()
+
+ def run(self):
+ """ Initiate amqp connection. """
+ self._connection = self.connect()
+ self._connection.ioloop.start()
+
+ def stop(self):
+ """ stop amqp consuming data """
+ self._closing = True
+ if self._channel:
+ self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
+
+ if self._connection:
+ self._connection.ioloop.start()
+
+ def close_connection(self):
+ """ close amqp connection """
+ self._connection.close()
diff --git a/yardstick/network_services/nfvi/collectd.sh b/yardstick/network_services/nfvi/collectd.sh
new file mode 100755
index 000000000..7acb40431
--- /dev/null
+++ b/yardstick/network_services/nfvi/collectd.sh
@@ -0,0 +1,89 @@
+#!/bin/bash
+#
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+INSTALL_NSB_BIN="/opt/nsb_bin"
+cd $INSTALL_NSB_BIN
+
+if [ "$(whoami)" != "root" ]; then
+ echo "Must be root to run $0"
+ exit 1;
+fi
+
+echo "Install required libraries to run collectd..."
+pkg=(git flex bison build-essential pkg-config automake autotools-dev libltdl-dev librabbitmq-dev rabbitmq-server)
+for i in "${pkg[@]}"; do
+dpkg-query -W --showformat='${Status}\n' "${i}"|grep "install ok installed"
+ if [ "$?" -eq "1" ]; then
+ apt-get -y install "${i}";
+ fi
+done
+echo "Done"
+
+ldconfig -p | grep libpqos >/dev/null
+if [ $? -eq 0 ]
+then
+ echo "Intel RDT library already installed. Done"
+else
+ pushd .
+
+ echo "Get intel_rdt repo and install..."
+ rm -rf intel-cmt-cat >/dev/null
+ git clone https://github.com/01org/intel-cmt-cat.git
+ pushd intel-cmt-cat
+ git checkout tags/v1.5 -b v1.5
+ make install PREFIX=/usr
+ popd
+
+ popd
+ echo "Done."
+fi
+
+which /opt/nsb_bin/collectd/collectd >/dev/null
+if [ $? -eq 0 ]
+then
+ echo "Collectd already installed. Done"
+else
+ pushd .
+ echo "Get collectd from repository and install..."
+ rm -rf collectd >/dev/null
+ git clone https://github.com/collectd/collectd.git
+ pushd collectd
+ git stash
+ git checkout -b collectd 43a4db3b3209f497a0ba408aebf8aee385c6262d
+ ./build.sh
+ ./configure --with-libpqos=/usr/
+ make install > /dev/null
+ popd
+ echo "Done."
+ popd
+fi
+
+modprobe msr
+cp $INSTALL_NSB_BIN/collectd.conf /opt/collectd/etc/
+
+echo "Check if admin user already created"
+rabbitmqctl list_users | grep '^admin$' > /dev/null
+if [ $? -eq 0 ];
+then
+ echo "'admin' user already created..."
+else
+ echo "Creating 'admin' user for collectd data export..."
+ rabbitmqctl delete_user guest
+ rabbitmqctl add_user admin admin
+ rabbitmqctl authenticate_user admin admin
+ rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
+ echo "Done"
+fi
diff --git a/yardstick/network_services/nfvi/resource.py b/yardstick/network_services/nfvi/resource.py
new file mode 100644
index 000000000..d71e1e995
--- /dev/null
+++ b/yardstick/network_services/nfvi/resource.py
@@ -0,0 +1,162 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" Resource collection definitions """
+
+from __future__ import absolute_import
+import logging
+import os.path
+import re
+import multiprocessing
+from oslo_config import cfg
+
+from yardstick import ssh
+from yardstick.network_services.nfvi.collectd import AmqpConsumer
+from yardstick.network_services.utils import provision_tool
+
+CONF = cfg.CONF
+ZMQ_OVS_PORT = 5567
+ZMQ_POLLING_TIME = 12000
+
+
+class ResourceProfile(object):
+ """
+ This profile adds a resource at the beginning of the test session
+ """
+
+ def __init__(self, vnfd, cores):
+ self.enable = True
+ self.connection = None
+ self.cores = cores
+
+ mgmt_interface = vnfd.get("mgmt-interface")
+ user = mgmt_interface.get("user")
+ passwd = mgmt_interface.get("password")
+ ip_addr = mgmt_interface.get("ip")
+ self.vnfip = mgmt_interface.get("host", ip_addr)
+ ssh_port = mgmt_interface.get("ssh_port", ssh.DEFAULT_PORT)
+ self.connection = ssh.SSH(user, self.vnfip,
+ password=passwd, port=ssh_port)
+ self.connection.wait()
+
+ def check_if_sa_running(self, process):
+ """ verify if system agent is running """
+ err, pid, _ = self.connection.execute("pgrep -f %s" % process)
+ return [err == 0, pid]
+
+ def run_collectd_amqp(self, queue):
+ """ run amqp consumer to collect the NFVi data """
+ amqp = \
+ AmqpConsumer('amqp://admin:admin@{}:5672/%2F'.format(self.vnfip),
+ queue)
+ try:
+ amqp.run()
+ except (AttributeError, RuntimeError, KeyboardInterrupt):
+ amqp.stop()
+
+ @classmethod
+ def get_cpu_data(cls, reskey, value):
+ """ Get cpu topology of the host """
+ pattern = r"-(\d+)"
+ if "cpufreq" in reskey[1]:
+ match = re.search(pattern, reskey[2], re.MULTILINE)
+ metric = reskey[1]
+ else:
+ match = re.search(pattern, reskey[1], re.MULTILINE)
+ metric = reskey[2]
+
+ time, val = re.split(":", value)
+ if match:
+ return [str(match.group(1)), metric, val, time]
+
+ return ["error", "Invalid", ""]
+
+ def parse_collectd_result(self, metrics, listcores):
+ """ convert collectd data into json"""
+ res = {"cpu": {}, "memory": {}}
+ testcase = ""
+
+ for key, value in metrics.items():
+ reskey = key.rsplit("/")
+ if "cpu" in reskey[1] or "intel_rdt" in reskey[1]:
+ cpu_key, name, metric, testcase = \
+ self.get_cpu_data(reskey, value)
+ if cpu_key in listcores:
+ res["cpu"].setdefault(cpu_key, {}).update({name: metric})
+ elif "memory" in reskey[1]:
+ val = re.split(":", value)[1]
+ res["memory"].update({reskey[2]: val})
+ res["timestamp"] = testcase
+
+ return res
+
+ def amqp_collect_nfvi_kpi(self, _queue=multiprocessing.Queue()):
+ """ amqp collect and return nfvi kpis """
+ try:
+ metric = {}
+ amqp_client = \
+ multiprocessing.Process(target=self.run_collectd_amqp,
+ args=(_queue,))
+ amqp_client.start()
+ amqp_client.join(7)
+ amqp_client.terminate()
+
+ while not _queue.empty():
+ metric.update(_queue.get())
+ except (AttributeError, RuntimeError, TypeError, ValueError):
+ logging.debug("Failed to get NFVi stats...")
+ msg = {}
+ else:
+ msg = self.parse_collectd_result(metric, self.cores)
+
+ return msg
+
+ @classmethod
+ def _start_collectd(cls, connection, bin_path):
+ connection.execute('pkill -9 collectd')
+ collectd = os.path.join(bin_path, "collectd.sh")
+ provision_tool(connection, collectd)
+ provision_tool(connection, os.path.join(bin_path, "collectd.conf"))
+
+ # Reset amqp queue
+ connection.execute("sudo service rabbitmq-server start")
+ connection.execute("sudo rabbitmqctl stop_app")
+ connection.execute("sudo rabbitmqctl reset")
+ connection.execute("sudo rabbitmqctl start_app")
+ connection.execute("sudo service rabbitmq-server restart")
+
+ # Run collectd
+ connection.execute(collectd)
+ connection.execute(os.path.join(bin_path, "collectd", "collectd"))
+
+ def initiate_systemagent(self, bin_path):
+ """ Start system agent for NFVi collection on host """
+ if self.enable:
+ self._start_collectd(self.connection, bin_path)
+
+ def start(self):
+ """ start nfvi collection """
+ if self.enable:
+ logging.debug("Start NVFi metric collection...")
+
+ def stop(self):
+ """ stop nfvi collection """
+ if self.enable:
+ agent = "collectd"
+ logging.debug("Stop resource monitor...")
+ status, pid = self.check_if_sa_running(agent)
+ if status:
+ self.connection.execute('kill -9 %s' % pid)
+ self.connection.execute('pkill -9 %s' % agent)
+ self.connection.execute('service rabbitmq-server stop')
+ self.connection.execute("sudo rabbitmqctl stop_app")