aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/network_services/nfvi/collectd.py
diff options
context:
space:
mode:
authorDeepak S <deepak.s@linux.intel.com>2016-12-30 09:22:25 -0800
committerDeepak S <deepak.s@linux.intel.com>2017-01-19 08:28:10 +0530
commitddb76faa5841997bd3eec4ed2f3d33f56e66d0c3 (patch)
tree2970946f924aedcc318158d19e24a3252cdf0cd5 /yardstick/network_services/nfvi/collectd.py
parent4c02cf5d19e2c36c9747a1c05d86331a1918baec (diff)
Add infrastructure to add the NFVi KPI collections
This patches added common function to collect NFVi KPIs for given usecases - Core KPIs like memory/LLC/IPC etc - OVS stats - memory stats etc. JIRA: YARDSTICK-488 Change-Id: Iab41146392efc47b7313b1846a67728a44d0f1d6 Signed-off-by: Deepak S <deepak.s@linux.intel.com>
Diffstat (limited to 'yardstick/network_services/nfvi/collectd.py')
-rw-r--r--yardstick/network_services/nfvi/collectd.py158
1 files changed, 158 insertions, 0 deletions
diff --git a/yardstick/network_services/nfvi/collectd.py b/yardstick/network_services/nfvi/collectd.py
new file mode 100644
index 000000000..ea80e4ff8
--- /dev/null
+++ b/yardstick/network_services/nfvi/collectd.py
@@ -0,0 +1,158 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" AMQP Consumer senario definition """
+
+from __future__ import absolute_import
+from __future__ import print_function
+import logging
+import pika
+from pika.exceptions import AMQPConnectionError
+
+
+class AmqpConsumer(object):
+ """ This Class handles amqp consumer and collects collectd data """
+ EXCHANGE = 'amq.fanout'
+ EXCHANGE_TYPE = 'fanout'
+ QUEUE = ''
+ ROUTING_KEY = 'collectd'
+
+ def __init__(self, amqp_url, queue):
+ self._connection = None
+ self._channel = None
+ self._closing = False
+ self._consumer_tag = None
+ self._url = amqp_url
+ self._queue = queue
+
+ def connect(self):
+ """ connect to amqp url """
+ try:
+ return pika.SelectConnection(pika.URLParameters(self._url),
+ self.on_connection_open,
+ stop_ioloop_on_close=False)
+ except AMQPConnectionError:
+ raise RuntimeError
+
+ def on_connection_open(self, unused_connection):
+ """ call back from pika & open channel """
+ logging.info("list of unused connections %s", unused_connection)
+ self._connection.add_on_close_callback(self.on_connection_closed)
+ self._connection.channel(on_open_callback=self.on_channel_open)
+
+ def on_connection_closed(self, connection, reply_code, reply_text):
+ """ close the amqp connections. if force close, try re-connect """
+ logging.info("amqp connection (%s)", connection)
+ self._channel = None
+ if self._closing:
+ self._connection.ioloop.stop()
+ else:
+ logging.debug(('Connection closed, reopening in 5 sec: (%s) %s',
+ reply_code, reply_text))
+ self._connection.add_timeout(5, self.reconnect)
+
+ def reconnect(self):
+ """ re-connect amqp consumer"""
+ self._connection.ioloop.stop()
+
+ if not self._closing:
+ self._connection = self.connect()
+ self._connection.ioloop.start()
+
+ def on_channel_open(self, channel):
+ """ add close callback & setup exchange """
+ self._channel = channel
+ self.add_on_channel_close_callback()
+ self._channel.exchange_declare(self.on_exchange_declareok,
+ self.EXCHANGE,
+ self.EXCHANGE_TYPE,
+ durable=True, auto_delete=False)
+
+ def add_on_channel_close_callback(self):
+ """ register for close callback """
+ self._channel.add_on_close_callback(self.on_channel_closed)
+
+ def on_channel_closed(self, channel, reply_code, reply_text):
+ """ close amqp channel connection """
+ logging.info("amqp channel closed channel(%s), "
+ "reply_code(%s) reply_text(%s)",
+ channel, reply_code, reply_text)
+ self._connection.close()
+
+ def on_exchange_declareok(self, unused_frame):
+ """ if exchange declare is ok, setup queue """
+ logging.info("amqp exchange unused frame (%s)", unused_frame)
+ self.setup_queue(self.QUEUE)
+
+ def setup_queue(self, queue_name):
+ """ setup queue & declare same with channel """
+ logging.info("amqp queue name (%s)", queue_name)
+ self._channel.queue_declare(self.on_queue_declareok, queue_name)
+
+ def on_queue_declareok(self, method_frame):
+ """ bind queue to channel """
+ logging.info("amqp queue method frame (%s)", method_frame)
+ self._channel.queue_bind(self._on_bindok, self.QUEUE,
+ self.EXCHANGE, self.ROUTING_KEY)
+
+ def _on_bindok(self, unused_frame):
+ """ call back on bind start consuming data from amqp queue """
+ logging.info("amqp unused frame %s", unused_frame)
+ self.add_on_cancel_callback()
+ self._consumer_tag = self._channel.basic_consume(self.on_message,
+ self.QUEUE)
+
+ def add_on_cancel_callback(self):
+ """ add cancel func to amqp callback """
+ self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
+
+ def on_consumer_cancelled(self, method_frame):
+ """ on cancel close the channel """
+ logging.info("amqp method frame %s", method_frame)
+ if self._channel:
+ self._channel.close()
+
+ def on_message(self, unused_channel, basic_deliver, properties, body):
+ """ parse received data from amqp server (collectd) """
+ logging.info("amqp unused channel %s, properties %s",
+ unused_channel, properties)
+ metrics = body.rsplit()
+ self._queue.put({metrics[1]: metrics[3]})
+ self.ack_message(basic_deliver.delivery_tag)
+
+ def ack_message(self, delivery_tag):
+ """ acknowledge amqp msg """
+ self._channel.basic_ack(delivery_tag)
+
+ def on_cancelok(self, unused_frame):
+ """ initiate amqp close channel on callback """
+ logging.info("amqp unused frame %s", unused_frame)
+ self._channel.close()
+
+ def run(self):
+ """ Initiate amqp connection. """
+ self._connection = self.connect()
+ self._connection.ioloop.start()
+
+ def stop(self):
+ """ stop amqp consuming data """
+ self._closing = True
+ if self._channel:
+ self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
+
+ if self._connection:
+ self._connection.ioloop.start()
+
+ def close_connection(self):
+ """ close amqp connection """
+ self._connection.close()