From ddb76faa5841997bd3eec4ed2f3d33f56e66d0c3 Mon Sep 17 00:00:00 2001
From: Deepak S <deepak.s@linux.intel.com>
Date: Fri, 30 Dec 2016 09:22:25 -0800
Subject: Add infrastructure to add the NFVi KPI collections

 This patches added common function to collect NFVi KPIs for given usecases
 - Core KPIs like memory/LLC/IPC etc
 - OVS stats
 - memory stats etc.

JIRA: YARDSTICK-488
Change-Id: Iab41146392efc47b7313b1846a67728a44d0f1d6
Signed-off-by: Deepak S <deepak.s@linux.intel.com>
---
 tests/unit/network_services/nfvi/__init__.py      |   0
 tests/unit/network_services/nfvi/test_collectd.py | 160 ++++++++++++++++++++
 tests/unit/network_services/nfvi/test_resource.py | 171 ++++++++++++++++++++++
 yardstick/network_services/nfvi/__init__.py       |   0
 yardstick/network_services/nfvi/collectd.conf     |  80 ++++++++++
 yardstick/network_services/nfvi/collectd.py       | 158 ++++++++++++++++++++
 yardstick/network_services/nfvi/collectd.sh       |  89 +++++++++++
 yardstick/network_services/nfvi/resource.py       | 162 ++++++++++++++++++++
 8 files changed, 820 insertions(+)
 create mode 100644 tests/unit/network_services/nfvi/__init__.py
 create mode 100644 tests/unit/network_services/nfvi/test_collectd.py
 create mode 100644 tests/unit/network_services/nfvi/test_resource.py
 create mode 100644 yardstick/network_services/nfvi/__init__.py
 create mode 100644 yardstick/network_services/nfvi/collectd.conf
 create mode 100644 yardstick/network_services/nfvi/collectd.py
 create mode 100755 yardstick/network_services/nfvi/collectd.sh
 create mode 100644 yardstick/network_services/nfvi/resource.py

diff --git a/tests/unit/network_services/nfvi/__init__.py b/tests/unit/network_services/nfvi/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tests/unit/network_services/nfvi/test_collectd.py b/tests/unit/network_services/nfvi/test_collectd.py
new file mode 100644
index 000000000..5bd7196df
--- /dev/null
+++ b/tests/unit/network_services/nfvi/test_collectd.py
@@ -0,0 +1,160 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import multiprocessing
+import mock
+
+from yardstick.network_services.nfvi.collectd import AmqpConsumer
+
+
+class TestAmqpConsumer(unittest.TestCase):
+    def setUp(self):
+        self.queue = multiprocessing.Queue()
+        self.url = 'amqp://admin:admin@1.1.1.1:5672/%2F'
+        self.amqp_consumer = AmqpConsumer(self.url, self.queue)
+
+    def test___init__(self):
+        self.assertEqual(self.url, self.amqp_consumer._url)
+
+    def test_connect(self):
+        self.assertRaises(RuntimeError, self.amqp_consumer.connect)
+
+    def test_on_connection_open(self):
+        self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+        self.amqp_consumer._connection.add_on_close_callback = \
+            mock.Mock(return_value=0)
+        self.amqp_consumer._connection.channel = mock.Mock(return_value=0)
+        self.assertEqual(None, self.amqp_consumer.on_connection_open(10))
+
+    def test_on_connection_closed(self):
+        self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+        self.amqp_consumer._connection.ioloop = mock.Mock()
+        self.amqp_consumer._connection.ioloop.stop = mock.Mock(return_value=0)
+        self.amqp_consumer._connection.add_timeout = mock.Mock(return_value=0)
+        self.amqp_consumer._closing = True
+        self.assertEqual(None,
+                         self.amqp_consumer.on_connection_closed("", 404,
+                                                                 "Not Found"))
+        self.amqp_consumer._closing = False
+        self.assertEqual(None,
+                         self.amqp_consumer.on_connection_closed("", 404,
+                                                                 "Not Found"))
+
+    def test_reconnect(self):
+        self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+        self.amqp_consumer._connection.ioloop = mock.Mock()
+        self.amqp_consumer._connection.ioloop.stop = mock.Mock(return_value=0)
+        self.amqp_consumer.connect = mock.Mock(return_value=0)
+        self.amqp_consumer._closing = True
+        self.assertEqual(None, self.amqp_consumer.reconnect())
+
+    def test_on_channel_open(self):
+        self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+        self.amqp_consumer._connection.add_on_close_callback = \
+            mock.Mock(return_value=0)
+        self.amqp_consumer._channel = mock.Mock()
+        self.amqp_consumer.add_on_channel_close_callback = mock.Mock()
+        self.amqp_consumer._channel.exchange_declare = \
+            mock.Mock(return_value=0)
+        self.assertEqual(None,
+                         self.amqp_consumer.on_channel_open(
+                             self.amqp_consumer._channel))
+
+    def test_add_on_channel_close_callback(self):
+        self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+        self.amqp_consumer._connection.add_on_close_callback = \
+            mock.Mock(return_value=0)
+        self.amqp_consumer._channel = mock.Mock()
+        self.amqp_consumer._channel.add_on_close_callback = mock.Mock()
+        self.assertEqual(None,
+                         self.amqp_consumer.add_on_channel_close_callback())
+
+    def test_on_channel_closed(self):
+        self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+        self.amqp_consumer._connection.close = mock.Mock(return_value=0)
+        _channel = mock.Mock()
+        self.assertEqual(None,
+                         self.amqp_consumer.on_channel_closed(_channel,
+                                                              "", ""))
+
+    def test_ion_exchange_declareok(self):
+        self.amqp_consumer.setup_queue = mock.Mock(return_value=0)
+        self.assertEqual(None, self.amqp_consumer.on_exchange_declareok(10))
+
+    def test_setup_queue(self):
+        self.amqp_consumer._channel = mock.Mock()
+        self.amqp_consumer._channel.add_on_close_callback = mock.Mock()
+        self.assertEqual(None, self.amqp_consumer.setup_queue("collectd"))
+
+    def test_on_queue_declareok(self):
+        self.amqp_consumer._channel = mock.Mock()
+        self.amqp_consumer._channel.queue_bind = mock.Mock()
+        self.assertEqual(None, self.amqp_consumer.on_queue_declareok(10))
+
+    def test__on_bindok(self):
+        self.amqp_consumer._channel = mock.Mock()
+        self.amqp_consumer._channel.basic_consume = mock.Mock()
+        self.amqp_consumer.add_on_cancel_callback = mock.Mock()
+        self.assertEqual(None, self.amqp_consumer._on_bindok(10))
+
+    def test_add_on_cancel_callback(self):
+        self.amqp_consumer._channel = mock.Mock()
+        self.amqp_consumer._channel.add_on_cancel_callback = mock.Mock()
+        self.assertEqual(None, self.amqp_consumer.add_on_cancel_callback())
+
+    def test_on_consumer_cancelled(self):
+        self.amqp_consumer._channel = mock.Mock()
+        self.amqp_consumer._channel.close = mock.Mock()
+        self.assertEqual(None, self.amqp_consumer.on_consumer_cancelled(10))
+
+    def test_on_message(self):
+        body = "msg {} cpu/cpu-0/ipc 101010:10"
+        properties = ""
+        basic_deliver = mock.Mock()
+        basic_deliver.delivery_tag = mock.Mock(return_value=0)
+        self.amqp_consumer.ack_message = mock.Mock()
+        self.assertEqual(None,
+                         self.amqp_consumer.on_message(10, basic_deliver,
+                                                       properties, body))
+
+    def test_ack_message(self):
+        self.amqp_consumer._channel = mock.Mock()
+        self.amqp_consumer._channel.basic_ack = mock.Mock()
+        self.assertEqual(None, self.amqp_consumer.ack_message(10))
+
+    def test_on_cancelok(self):
+        self.amqp_consumer._channel = mock.Mock()
+        self.amqp_consumer._channel.close = mock.Mock()
+        self.assertEqual(None, self.amqp_consumer.on_cancelok(10))
+
+    def test_run(self):
+        self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+        self.amqp_consumer.connect = mock.Mock()
+        self.amqp_consumer._connection.ioloop.start = mock.Mock()
+        self.assertEqual(None, self.amqp_consumer.run())
+
+    def test_stop(self):
+        self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+        self.amqp_consumer.connect = mock.Mock()
+        self.amqp_consumer._connection.ioloop.start = mock.Mock()
+        self.amqp_consumer._channel = mock.Mock()
+        self.amqp_consumer._channel.basic_cancel = mock.Mock()
+        self.assertEqual(None, self.amqp_consumer.stop())
+
+    def test_close_connection(self):
+        self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
+        self.amqp_consumer._connection.close = mock.Mock()
+        self.assertEqual(None, self.amqp_consumer.close_connection())
diff --git a/tests/unit/network_services/nfvi/test_resource.py b/tests/unit/network_services/nfvi/test_resource.py
new file mode 100644
index 000000000..90910d89b
--- /dev/null
+++ b/tests/unit/network_services/nfvi/test_resource.py
@@ -0,0 +1,171 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import multiprocessing
+import mock
+
+from yardstick.network_services.nfvi.resource import ResourceProfile
+from yardstick.network_services.nfvi import resource, collectd
+
+
+class TestResourceProfile(unittest.TestCase):
+    VNFD = {'vnfd:vnfd-catalog':
+            {'vnfd':
+             [{'short-name': 'VpeVnf',
+               'vdu':
+               [{'routing_table':
+                 [{'network': '152.16.100.20',
+                   'netmask': '255.255.255.0',
+                   'gateway': '152.16.100.20',
+                   'if': 'xe0'},
+                  {'network': '152.16.40.20',
+                   'netmask': '255.255.255.0',
+                   'gateway': '152.16.40.20',
+                   'if': 'xe1'}],
+                 'description': 'VPE approximation using DPDK',
+                 'name': 'vpevnf-baremetal',
+                 'nd_route_tbl':
+                 [{'network': '0064:ff9b:0:0:0:0:9810:6414',
+                   'netmask': '112',
+                   'gateway': '0064:ff9b:0:0:0:0:9810:6414',
+                   'if': 'xe0'},
+                  {'network': '0064:ff9b:0:0:0:0:9810:2814',
+                   'netmask': '112',
+                   'gateway': '0064:ff9b:0:0:0:0:9810:2814',
+                   'if': 'xe1'}],
+                 'id': 'vpevnf-baremetal',
+                 'external-interface':
+                 [{'virtual-interface':
+                   {'dst_mac': '3c:fd:fe:9e:64:38',
+                    'vpci': '0000:05:00.0',
+                    'local_ip': '152.16.100.19',
+                    'type': 'PCI-PASSTHROUGH',
+                    'netmask': '255.255.255.0',
+                    'dpdk_port_num': '0',
+                    'bandwidth': '10 Gbps',
+                    'dst_ip': '152.16.100.20',
+                    'local_mac': '3c:fd:fe:a1:2b:80'},
+                   'vnfd-connection-point-ref': 'xe0',
+                   'name': 'xe0'},
+                  {'virtual-interface':
+                   {'dst_mac': '00:1e:67:d0:60:5c',
+                    'vpci': '0000:05:00.1',
+                    'local_ip': '152.16.40.19',
+                    'type': 'PCI-PASSTHROUGH',
+                    'netmask': '255.255.255.0',
+                    'dpdk_port_num': '1',
+                    'bandwidth': '10 Gbps',
+                    'dst_ip': '152.16.40.20',
+                    'local_mac': '3c:fd:fe:a1:2b:81'},
+                   'vnfd-connection-point-ref': 'xe1',
+                   'name': 'xe1'}]}],
+               'description': 'Vpe approximation using DPDK',
+               'mgmt-interface':
+                   {'vdu-id': 'vpevnf-baremetal',
+                    'host': '1.1.1.1',
+                    'password': 'r00t',
+                    'user': 'root',
+                    'ip': '1.1.1.1'},
+               'benchmark':
+                   {'kpi': ['packets_in', 'packets_fwd', 'packets_dropped']},
+               'connection-point': [{'type': 'VPORT', 'name': 'xe0'},
+                                    {'type': 'VPORT', 'name': 'xe1'}],
+               'id': 'VpeApproxVnf', 'name': 'VPEVnfSsh'}]}}
+
+    def setUp(self):
+        with mock.patch("yardstick.ssh.SSH") as ssh:
+            ssh_mock = mock.Mock(autospec=ssh.SSH)
+            ssh_mock.execute = \
+                mock.Mock(return_value=(0, {}, ""))
+            ssh.return_value = ssh_mock
+
+            self.resource_profile = \
+                ResourceProfile(self.VNFD['vnfd:vnfd-catalog']['vnfd'][0],
+                                [1, 2, 3])
+
+    def test___init__(self):
+        self.assertEqual(True, self.resource_profile.enable)
+
+    def test_check_if_sa_running(self):
+        self.assertEqual(self.resource_profile.check_if_sa_running("collectd"),
+                         [True, {}])
+
+    def test_amqp_collect_nfvi_kpi(self):
+        _queue = multiprocessing.Queue()
+        _queue.put({"cpu/cpu-0/ipc": "ipc:10"})
+        amqp = self.resource_profile.amqp_collect_nfvi_kpi(_queue)
+        _queue.put({"/memory/bandwidth": "local:10"})
+        amqp = self.resource_profile.amqp_collect_nfvi_kpi(_queue)
+        expected = {'timestamp': '', 'cpu': {}, 'memory': {'bandwidth': '10'}}
+        self.assertDictEqual(expected, amqp)
+
+    def test_amqp_collect_nfvi_kpi_exception(self):
+        amqp = self.resource_profile.amqp_collect_nfvi_kpi({})
+        self.assertDictEqual({}, amqp)
+
+    def test_get_cpu_data(self):
+        reskey = ["", "cpufreq", "cpufreq-0"]
+        value = "metric:10"
+        val = self.resource_profile.get_cpu_data(reskey, value)
+        self.assertEqual(val, ['0', 'cpufreq', '10', 'metric'])
+
+    def test_get_cpu_data_error(self):
+        reskey = ["", "", ""]
+        value = "metric:10"
+        val = self.resource_profile.get_cpu_data(reskey, value)
+        self.assertEqual(val, ['error', 'Invalid', ''])
+
+    def test__start_collectd(self):
+        with mock.patch("yardstick.ssh.SSH") as ssh:
+            ssh_mock = mock.Mock(autospec=ssh.SSH)
+            ssh_mock.execute = \
+                mock.Mock(return_value=(0, "", ""))
+            ssh.return_value = ssh_mock
+            resource_profile = \
+                ResourceProfile(self.VNFD['vnfd:vnfd-catalog']['vnfd'][0],
+                                [1, 2, 3])
+            self.assertIsNone(
+                resource_profile._start_collectd(ssh_mock, "/opt/nsb_bin"))
+
+    def test_initiate_systemagent(self):
+        with mock.patch("yardstick.ssh.SSH") as ssh:
+            ssh_mock = mock.Mock(autospec=ssh.SSH)
+            ssh_mock.execute = \
+                mock.Mock(return_value=(0, "", ""))
+            ssh.return_value = ssh_mock
+            resource_profile = \
+                ResourceProfile(self.VNFD['vnfd:vnfd-catalog']['vnfd'][0],
+                                [1, 2, 3])
+            self.assertIsNone(
+                resource_profile.initiate_systemagent("/opt/nsb_bin"))
+
+    def test_parse_collectd_result(self):
+        res = self.resource_profile.parse_collectd_result({}, [0, 1, 2])
+        self.assertDictEqual(res, {'timestamp': '', 'cpu': {}, 'memory': {}})
+
+    def test_run_collectd_amqp(self):
+        _queue = multiprocessing.Queue()
+        resource.AmqpConsumer = mock.Mock(autospec=collectd)
+        self.assertIsNone(self.resource_profile.run_collectd_amqp(_queue))
+
+    def test_start(self):
+        self.assertIsNone(self.resource_profile.start())
+
+    def test_stop(self):
+        self.assertIsNone(self.resource_profile.stop())
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/yardstick/network_services/nfvi/__init__.py b/yardstick/network_services/nfvi/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/yardstick/network_services/nfvi/collectd.conf b/yardstick/network_services/nfvi/collectd.conf
new file mode 100644
index 000000000..abcf24ded
--- /dev/null
+++ b/yardstick/network_services/nfvi/collectd.conf
@@ -0,0 +1,80 @@
+# Config file for collectd(1).
+#
+# Some plugins need additional configuration and are disabled by default.
+# Please read collectd.conf(5) for details.
+#
+# You should also read /usr/share/doc/collectd-core/README.Debian.plugins
+# before enabling any more plugins.
+
+##############################################################################
+# Global                                                                     #
+#----------------------------------------------------------------------------#
+# Global settings for the daemon.                                            #
+##############################################################################
+
+Hostname "nsb_stats"
+FQDNLookup true
+
+Interval 5
+
+##############################################################################
+# LoadPlugin section                                                         #
+#----------------------------------------------------------------------------#
+# Specify what features to activate.                                         #
+##############################################################################
+
+LoadPlugin amqp
+LoadPlugin cpu
+LoadPlugin intel_rdt
+LoadPlugin memory
+
+##############################################################################
+# Plugin configuration                                                       #
+#----------------------------------------------------------------------------#
+# In this section configuration stubs for each plugin are provided. A desc-  #
+# ription of those options is available in the collectd.conf(5) manual page. #
+##############################################################################
+
+<Plugin amqp>
+	<Publish "name">
+		Host "0.0.0.0"
+		Port "5672"
+		VHost "/"
+		User "admin"
+		Password "admin"
+		Exchange "amq.fanout"
+		RoutingKey "collectd"
+		Persistent false
+		StoreRates false
+		ConnectionRetryDelay 0
+	</Publish>
+</Plugin>
+
+<Plugin cpu>
+	ReportByCpu true
+	ReportByState true
+	ValuesPercentage false
+</Plugin>
+
+<Plugin memory>
+	ValuesAbsolute true
+	ValuesPercentage false
+</Plugin>
+
+<LoadPlugin intel_rdt>
+  Interval 5
+</LoadPlugin>
+<Plugin "intel_rdt">
+  Cores ""
+</Plugin>
+
+<Plugin memcached>
+	<Instance "local">
+		Host "127.0.0.1"
+		Port "11211"
+	</Instance>
+</Plugin>
+
+<Include "/etc/collectd/collectd.conf.d">
+	Filter "*.conf"
+</Include>
diff --git a/yardstick/network_services/nfvi/collectd.py b/yardstick/network_services/nfvi/collectd.py
new file mode 100644
index 000000000..ea80e4ff8
--- /dev/null
+++ b/yardstick/network_services/nfvi/collectd.py
@@ -0,0 +1,158 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" AMQP Consumer senario definition """
+
+from __future__ import absolute_import
+from __future__ import print_function
+import logging
+import pika
+from pika.exceptions import AMQPConnectionError
+
+
+class AmqpConsumer(object):
+    """ This Class handles amqp consumer and collects collectd data """
+    EXCHANGE = 'amq.fanout'
+    EXCHANGE_TYPE = 'fanout'
+    QUEUE = ''
+    ROUTING_KEY = 'collectd'
+
+    def __init__(self, amqp_url, queue):
+        self._connection = None
+        self._channel = None
+        self._closing = False
+        self._consumer_tag = None
+        self._url = amqp_url
+        self._queue = queue
+
+    def connect(self):
+        """ connect to amqp url """
+        try:
+            return pika.SelectConnection(pika.URLParameters(self._url),
+                                         self.on_connection_open,
+                                         stop_ioloop_on_close=False)
+        except AMQPConnectionError:
+            raise RuntimeError
+
+    def on_connection_open(self, unused_connection):
+        """ call back from pika & open channel """
+        logging.info("list of unused connections %s", unused_connection)
+        self._connection.add_on_close_callback(self.on_connection_closed)
+        self._connection.channel(on_open_callback=self.on_channel_open)
+
+    def on_connection_closed(self, connection, reply_code, reply_text):
+        """ close the amqp connections. if force close, try re-connect """
+        logging.info("amqp connection (%s)", connection)
+        self._channel = None
+        if self._closing:
+            self._connection.ioloop.stop()
+        else:
+            logging.debug(('Connection closed, reopening in 5 sec: (%s) %s',
+                           reply_code, reply_text))
+            self._connection.add_timeout(5, self.reconnect)
+
+    def reconnect(self):
+        """ re-connect amqp consumer"""
+        self._connection.ioloop.stop()
+
+        if not self._closing:
+            self._connection = self.connect()
+            self._connection.ioloop.start()
+
+    def on_channel_open(self, channel):
+        """ add close callback & setup exchange """
+        self._channel = channel
+        self.add_on_channel_close_callback()
+        self._channel.exchange_declare(self.on_exchange_declareok,
+                                       self.EXCHANGE,
+                                       self.EXCHANGE_TYPE,
+                                       durable=True, auto_delete=False)
+
+    def add_on_channel_close_callback(self):
+        """ register for close callback """
+        self._channel.add_on_close_callback(self.on_channel_closed)
+
+    def on_channel_closed(self, channel, reply_code, reply_text):
+        """ close amqp channel connection """
+        logging.info("amqp channel closed channel(%s), "
+                     "reply_code(%s) reply_text(%s)",
+                     channel, reply_code, reply_text)
+        self._connection.close()
+
+    def on_exchange_declareok(self, unused_frame):
+        """ if exchange declare is ok, setup queue """
+        logging.info("amqp exchange unused frame (%s)", unused_frame)
+        self.setup_queue(self.QUEUE)
+
+    def setup_queue(self, queue_name):
+        """ setup queue & declare same with channel """
+        logging.info("amqp queue name (%s)", queue_name)
+        self._channel.queue_declare(self.on_queue_declareok, queue_name)
+
+    def on_queue_declareok(self, method_frame):
+        """ bind queue to channel """
+        logging.info("amqp queue method frame (%s)", method_frame)
+        self._channel.queue_bind(self._on_bindok, self.QUEUE,
+                                 self.EXCHANGE, self.ROUTING_KEY)
+
+    def _on_bindok(self, unused_frame):
+        """ call back on bind start consuming data from amqp queue """
+        logging.info("amqp unused frame %s", unused_frame)
+        self.add_on_cancel_callback()
+        self._consumer_tag = self._channel.basic_consume(self.on_message,
+                                                         self.QUEUE)
+
+    def add_on_cancel_callback(self):
+        """ add cancel func to amqp callback """
+        self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
+
+    def on_consumer_cancelled(self, method_frame):
+        """ on cancel close the channel """
+        logging.info("amqp method frame %s", method_frame)
+        if self._channel:
+            self._channel.close()
+
+    def on_message(self, unused_channel, basic_deliver, properties, body):
+        """ parse received data from amqp server (collectd) """
+        logging.info("amqp unused channel %s, properties %s",
+                     unused_channel, properties)
+        metrics = body.rsplit()
+        self._queue.put({metrics[1]: metrics[3]})
+        self.ack_message(basic_deliver.delivery_tag)
+
+    def ack_message(self, delivery_tag):
+        """ acknowledge amqp msg """
+        self._channel.basic_ack(delivery_tag)
+
+    def on_cancelok(self, unused_frame):
+        """ initiate amqp close channel on callback """
+        logging.info("amqp unused frame %s", unused_frame)
+        self._channel.close()
+
+    def run(self):
+        """ Initiate amqp connection. """
+        self._connection = self.connect()
+        self._connection.ioloop.start()
+
+    def stop(self):
+        """ stop amqp consuming data """
+        self._closing = True
+        if self._channel:
+            self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
+
+        if self._connection:
+            self._connection.ioloop.start()
+
+    def close_connection(self):
+        """ close amqp connection """
+        self._connection.close()
diff --git a/yardstick/network_services/nfvi/collectd.sh b/yardstick/network_services/nfvi/collectd.sh
new file mode 100755
index 000000000..7acb40431
--- /dev/null
+++ b/yardstick/network_services/nfvi/collectd.sh
@@ -0,0 +1,89 @@
+#!/bin/bash
+#
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+INSTALL_NSB_BIN="/opt/nsb_bin"
+cd $INSTALL_NSB_BIN
+
+if [ "$(whoami)" != "root" ]; then
+        echo "Must be root to run $0"
+        exit 1;
+fi
+
+echo "Install required libraries to run collectd..."
+pkg=(git flex bison build-essential pkg-config automake  autotools-dev libltdl-dev librabbitmq-dev rabbitmq-server)
+for i in "${pkg[@]}"; do
+dpkg-query -W --showformat='${Status}\n' "${i}"|grep "install ok installed"
+    if [  "$?" -eq "1" ]; then
+        apt-get -y install "${i}";
+    fi
+done
+echo "Done"
+
+ldconfig -p | grep libpqos >/dev/null
+if [ $? -eq 0 ]
+then
+    echo "Intel RDT library already installed. Done"
+else
+    pushd .
+
+    echo "Get intel_rdt repo and install..."
+    rm -rf intel-cmt-cat >/dev/null
+    git clone https://github.com/01org/intel-cmt-cat.git
+    pushd intel-cmt-cat
+    git checkout tags/v1.5 -b v1.5
+    make install PREFIX=/usr
+    popd
+
+    popd
+    echo "Done."
+fi
+
+which /opt/nsb_bin/collectd/collectd >/dev/null
+if [ $? -eq 0 ]
+then
+    echo "Collectd already installed. Done"
+else
+    pushd .
+    echo "Get collectd from repository and install..."
+    rm -rf collectd >/dev/null
+    git clone https://github.com/collectd/collectd.git
+    pushd collectd
+    git stash
+    git checkout -b collectd 43a4db3b3209f497a0ba408aebf8aee385c6262d
+    ./build.sh
+    ./configure --with-libpqos=/usr/
+    make install > /dev/null
+    popd
+    echo "Done."
+    popd
+fi
+
+modprobe msr
+cp $INSTALL_NSB_BIN/collectd.conf /opt/collectd/etc/
+
+echo "Check if admin user already created"
+rabbitmqctl list_users | grep '^admin$' > /dev/null
+if [ $? -eq 0 ];
+then
+    echo "'admin' user already created..."
+else
+    echo "Creating 'admin' user for collectd data export..."
+    rabbitmqctl delete_user guest
+    rabbitmqctl add_user admin admin
+    rabbitmqctl authenticate_user admin admin
+    rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
+    echo "Done"
+fi
diff --git a/yardstick/network_services/nfvi/resource.py b/yardstick/network_services/nfvi/resource.py
new file mode 100644
index 000000000..d71e1e995
--- /dev/null
+++ b/yardstick/network_services/nfvi/resource.py
@@ -0,0 +1,162 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" Resource collection definitions """
+
+from __future__ import absolute_import
+import logging
+import os.path
+import re
+import multiprocessing
+from oslo_config import cfg
+
+from yardstick import ssh
+from yardstick.network_services.nfvi.collectd import AmqpConsumer
+from yardstick.network_services.utils import provision_tool
+
+CONF = cfg.CONF
+ZMQ_OVS_PORT = 5567
+ZMQ_POLLING_TIME = 12000
+
+
+class ResourceProfile(object):
+    """
+    This profile adds a resource at the beginning of the test session
+    """
+
+    def __init__(self, vnfd, cores):
+        self.enable = True
+        self.connection = None
+        self.cores = cores
+
+        mgmt_interface = vnfd.get("mgmt-interface")
+        user = mgmt_interface.get("user")
+        passwd = mgmt_interface.get("password")
+        ip_addr = mgmt_interface.get("ip")
+        self.vnfip = mgmt_interface.get("host", ip_addr)
+        ssh_port = mgmt_interface.get("ssh_port", ssh.DEFAULT_PORT)
+        self.connection = ssh.SSH(user, self.vnfip,
+                                  password=passwd, port=ssh_port)
+        self.connection.wait()
+
+    def check_if_sa_running(self, process):
+        """ verify if system agent is running """
+        err, pid, _ = self.connection.execute("pgrep -f %s" % process)
+        return [err == 0, pid]
+
+    def run_collectd_amqp(self, queue):
+        """ run amqp consumer to collect the NFVi data """
+        amqp = \
+            AmqpConsumer('amqp://admin:admin@{}:5672/%2F'.format(self.vnfip),
+                         queue)
+        try:
+            amqp.run()
+        except (AttributeError, RuntimeError, KeyboardInterrupt):
+            amqp.stop()
+
+    @classmethod
+    def get_cpu_data(cls, reskey, value):
+        """ Get cpu topology of the host """
+        pattern = r"-(\d+)"
+        if "cpufreq" in reskey[1]:
+            match = re.search(pattern, reskey[2], re.MULTILINE)
+            metric = reskey[1]
+        else:
+            match = re.search(pattern, reskey[1], re.MULTILINE)
+            metric = reskey[2]
+
+        time, val = re.split(":", value)
+        if match:
+            return [str(match.group(1)), metric, val, time]
+
+        return ["error", "Invalid", ""]
+
+    def parse_collectd_result(self, metrics, listcores):
+        """ convert collectd data into json"""
+        res = {"cpu": {}, "memory": {}}
+        testcase = ""
+
+        for key, value in metrics.items():
+            reskey = key.rsplit("/")
+            if "cpu" in reskey[1] or "intel_rdt" in reskey[1]:
+                cpu_key, name, metric, testcase = \
+                    self.get_cpu_data(reskey, value)
+                if cpu_key in listcores:
+                    res["cpu"].setdefault(cpu_key, {}).update({name: metric})
+            elif "memory" in reskey[1]:
+                val = re.split(":", value)[1]
+                res["memory"].update({reskey[2]: val})
+        res["timestamp"] = testcase
+
+        return res
+
+    def amqp_collect_nfvi_kpi(self, _queue=multiprocessing.Queue()):
+        """ amqp collect and return nfvi kpis """
+        try:
+            metric = {}
+            amqp_client = \
+                multiprocessing.Process(target=self.run_collectd_amqp,
+                                        args=(_queue,))
+            amqp_client.start()
+            amqp_client.join(7)
+            amqp_client.terminate()
+
+            while not _queue.empty():
+                metric.update(_queue.get())
+        except (AttributeError, RuntimeError, TypeError, ValueError):
+            logging.debug("Failed to get NFVi stats...")
+            msg = {}
+        else:
+            msg = self.parse_collectd_result(metric, self.cores)
+
+        return msg
+
+    @classmethod
+    def _start_collectd(cls, connection, bin_path):
+        connection.execute('pkill -9 collectd')
+        collectd = os.path.join(bin_path, "collectd.sh")
+        provision_tool(connection, collectd)
+        provision_tool(connection, os.path.join(bin_path, "collectd.conf"))
+
+        # Reset amqp queue
+        connection.execute("sudo service rabbitmq-server start")
+        connection.execute("sudo rabbitmqctl stop_app")
+        connection.execute("sudo rabbitmqctl reset")
+        connection.execute("sudo rabbitmqctl start_app")
+        connection.execute("sudo service rabbitmq-server restart")
+
+        # Run collectd
+        connection.execute(collectd)
+        connection.execute(os.path.join(bin_path, "collectd", "collectd"))
+
+    def initiate_systemagent(self, bin_path):
+        """ Start system agent for NFVi collection on host """
+        if self.enable:
+            self._start_collectd(self.connection, bin_path)
+
+    def start(self):
+        """ start nfvi collection """
+        if self.enable:
+            logging.debug("Start NVFi metric collection...")
+
+    def stop(self):
+        """ stop nfvi collection """
+        if self.enable:
+            agent = "collectd"
+            logging.debug("Stop resource monitor...")
+            status, pid = self.check_if_sa_running(agent)
+            if status:
+                self.connection.execute('kill -9 %s' % pid)
+                self.connection.execute('pkill -9 %s' % agent)
+                self.connection.execute('service rabbitmq-server stop')
+                self.connection.execute("sudo rabbitmqctl stop_app")
-- 
cgit