aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/network_services/nfvi
diff options
context:
space:
mode:
authorDeepak S <deepak.s@linux.intel.com>2016-12-30 09:22:25 -0800
committerDeepak S <deepak.s@linux.intel.com>2017-01-19 08:28:10 +0530
commitddb76faa5841997bd3eec4ed2f3d33f56e66d0c3 (patch)
tree2970946f924aedcc318158d19e24a3252cdf0cd5 /yardstick/network_services/nfvi
parent4c02cf5d19e2c36c9747a1c05d86331a1918baec (diff)
Add infrastructure to add the NFVi KPI collections
This patches added common function to collect NFVi KPIs for given usecases - Core KPIs like memory/LLC/IPC etc - OVS stats - memory stats etc. JIRA: YARDSTICK-488 Change-Id: Iab41146392efc47b7313b1846a67728a44d0f1d6 Signed-off-by: Deepak S <deepak.s@linux.intel.com>
Diffstat (limited to 'yardstick/network_services/nfvi')
-rw-r--r--yardstick/network_services/nfvi/__init__.py0
-rw-r--r--yardstick/network_services/nfvi/collectd.conf80
-rw-r--r--yardstick/network_services/nfvi/collectd.py158
-rwxr-xr-xyardstick/network_services/nfvi/collectd.sh89
-rw-r--r--yardstick/network_services/nfvi/resource.py162
5 files changed, 489 insertions, 0 deletions
diff --git a/yardstick/network_services/nfvi/__init__.py b/yardstick/network_services/nfvi/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/yardstick/network_services/nfvi/__init__.py
diff --git a/yardstick/network_services/nfvi/collectd.conf b/yardstick/network_services/nfvi/collectd.conf
new file mode 100644
index 000000000..abcf24ded
--- /dev/null
+++ b/yardstick/network_services/nfvi/collectd.conf
@@ -0,0 +1,80 @@
+# Config file for collectd(1).
+#
+# Some plugins need additional configuration and are disabled by default.
+# Please read collectd.conf(5) for details.
+#
+# You should also read /usr/share/doc/collectd-core/README.Debian.plugins
+# before enabling any more plugins.
+
+##############################################################################
+# Global #
+#----------------------------------------------------------------------------#
+# Global settings for the daemon. #
+##############################################################################
+
+Hostname "nsb_stats"
+FQDNLookup true
+
+Interval 5
+
+##############################################################################
+# LoadPlugin section #
+#----------------------------------------------------------------------------#
+# Specify what features to activate. #
+##############################################################################
+
+LoadPlugin amqp
+LoadPlugin cpu
+LoadPlugin intel_rdt
+LoadPlugin memory
+
+##############################################################################
+# Plugin configuration #
+#----------------------------------------------------------------------------#
+# In this section configuration stubs for each plugin are provided. A desc- #
+# ription of those options is available in the collectd.conf(5) manual page. #
+##############################################################################
+
+<Plugin amqp>
+ <Publish "name">
+ Host "0.0.0.0"
+ Port "5672"
+ VHost "/"
+ User "admin"
+ Password "admin"
+ Exchange "amq.fanout"
+ RoutingKey "collectd"
+ Persistent false
+ StoreRates false
+ ConnectionRetryDelay 0
+ </Publish>
+</Plugin>
+
+<Plugin cpu>
+ ReportByCpu true
+ ReportByState true
+ ValuesPercentage false
+</Plugin>
+
+<Plugin memory>
+ ValuesAbsolute true
+ ValuesPercentage false
+</Plugin>
+
+<LoadPlugin intel_rdt>
+ Interval 5
+</LoadPlugin>
+<Plugin "intel_rdt">
+ Cores ""
+</Plugin>
+
+<Plugin memcached>
+ <Instance "local">
+ Host "127.0.0.1"
+ Port "11211"
+ </Instance>
+</Plugin>
+
+<Include "/etc/collectd/collectd.conf.d">
+ Filter "*.conf"
+</Include>
diff --git a/yardstick/network_services/nfvi/collectd.py b/yardstick/network_services/nfvi/collectd.py
new file mode 100644
index 000000000..ea80e4ff8
--- /dev/null
+++ b/yardstick/network_services/nfvi/collectd.py
@@ -0,0 +1,158 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" AMQP Consumer senario definition """
+
+from __future__ import absolute_import
+from __future__ import print_function
+import logging
+import pika
+from pika.exceptions import AMQPConnectionError
+
+
+class AmqpConsumer(object):
+ """ This Class handles amqp consumer and collects collectd data """
+ EXCHANGE = 'amq.fanout'
+ EXCHANGE_TYPE = 'fanout'
+ QUEUE = ''
+ ROUTING_KEY = 'collectd'
+
+ def __init__(self, amqp_url, queue):
+ self._connection = None
+ self._channel = None
+ self._closing = False
+ self._consumer_tag = None
+ self._url = amqp_url
+ self._queue = queue
+
+ def connect(self):
+ """ connect to amqp url """
+ try:
+ return pika.SelectConnection(pika.URLParameters(self._url),
+ self.on_connection_open,
+ stop_ioloop_on_close=False)
+ except AMQPConnectionError:
+ raise RuntimeError
+
+ def on_connection_open(self, unused_connection):
+ """ call back from pika & open channel """
+ logging.info("list of unused connections %s", unused_connection)
+ self._connection.add_on_close_callback(self.on_connection_closed)
+ self._connection.channel(on_open_callback=self.on_channel_open)
+
+ def on_connection_closed(self, connection, reply_code, reply_text):
+ """ close the amqp connections. if force close, try re-connect """
+ logging.info("amqp connection (%s)", connection)
+ self._channel = None
+ if self._closing:
+ self._connection.ioloop.stop()
+ else:
+ logging.debug(('Connection closed, reopening in 5 sec: (%s) %s',
+ reply_code, reply_text))
+ self._connection.add_timeout(5, self.reconnect)
+
+ def reconnect(self):
+ """ re-connect amqp consumer"""
+ self._connection.ioloop.stop()
+
+ if not self._closing:
+ self._connection = self.connect()
+ self._connection.ioloop.start()
+
+ def on_channel_open(self, channel):
+ """ add close callback & setup exchange """
+ self._channel = channel
+ self.add_on_channel_close_callback()
+ self._channel.exchange_declare(self.on_exchange_declareok,
+ self.EXCHANGE,
+ self.EXCHANGE_TYPE,
+ durable=True, auto_delete=False)
+
+ def add_on_channel_close_callback(self):
+ """ register for close callback """
+ self._channel.add_on_close_callback(self.on_channel_closed)
+
+ def on_channel_closed(self, channel, reply_code, reply_text):
+ """ close amqp channel connection """
+ logging.info("amqp channel closed channel(%s), "
+ "reply_code(%s) reply_text(%s)",
+ channel, reply_code, reply_text)
+ self._connection.close()
+
+ def on_exchange_declareok(self, unused_frame):
+ """ if exchange declare is ok, setup queue """
+ logging.info("amqp exchange unused frame (%s)", unused_frame)
+ self.setup_queue(self.QUEUE)
+
+ def setup_queue(self, queue_name):
+ """ setup queue & declare same with channel """
+ logging.info("amqp queue name (%s)", queue_name)
+ self._channel.queue_declare(self.on_queue_declareok, queue_name)
+
+ def on_queue_declareok(self, method_frame):
+ """ bind queue to channel """
+ logging.info("amqp queue method frame (%s)", method_frame)
+ self._channel.queue_bind(self._on_bindok, self.QUEUE,
+ self.EXCHANGE, self.ROUTING_KEY)
+
+ def _on_bindok(self, unused_frame):
+ """ call back on bind start consuming data from amqp queue """
+ logging.info("amqp unused frame %s", unused_frame)
+ self.add_on_cancel_callback()
+ self._consumer_tag = self._channel.basic_consume(self.on_message,
+ self.QUEUE)
+
+ def add_on_cancel_callback(self):
+ """ add cancel func to amqp callback """
+ self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
+
+ def on_consumer_cancelled(self, method_frame):
+ """ on cancel close the channel """
+ logging.info("amqp method frame %s", method_frame)
+ if self._channel:
+ self._channel.close()
+
+ def on_message(self, unused_channel, basic_deliver, properties, body):
+ """ parse received data from amqp server (collectd) """
+ logging.info("amqp unused channel %s, properties %s",
+ unused_channel, properties)
+ metrics = body.rsplit()
+ self._queue.put({metrics[1]: metrics[3]})
+ self.ack_message(basic_deliver.delivery_tag)
+
+ def ack_message(self, delivery_tag):
+ """ acknowledge amqp msg """
+ self._channel.basic_ack(delivery_tag)
+
+ def on_cancelok(self, unused_frame):
+ """ initiate amqp close channel on callback """
+ logging.info("amqp unused frame %s", unused_frame)
+ self._channel.close()
+
+ def run(self):
+ """ Initiate amqp connection. """
+ self._connection = self.connect()
+ self._connection.ioloop.start()
+
+ def stop(self):
+ """ stop amqp consuming data """
+ self._closing = True
+ if self._channel:
+ self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
+
+ if self._connection:
+ self._connection.ioloop.start()
+
+ def close_connection(self):
+ """ close amqp connection """
+ self._connection.close()
diff --git a/yardstick/network_services/nfvi/collectd.sh b/yardstick/network_services/nfvi/collectd.sh
new file mode 100755
index 000000000..7acb40431
--- /dev/null
+++ b/yardstick/network_services/nfvi/collectd.sh
@@ -0,0 +1,89 @@
+#!/bin/bash
+#
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+INSTALL_NSB_BIN="/opt/nsb_bin"
+cd $INSTALL_NSB_BIN
+
+if [ "$(whoami)" != "root" ]; then
+ echo "Must be root to run $0"
+ exit 1;
+fi
+
+echo "Install required libraries to run collectd..."
+pkg=(git flex bison build-essential pkg-config automake autotools-dev libltdl-dev librabbitmq-dev rabbitmq-server)
+for i in "${pkg[@]}"; do
+dpkg-query -W --showformat='${Status}\n' "${i}"|grep "install ok installed"
+ if [ "$?" -eq "1" ]; then
+ apt-get -y install "${i}";
+ fi
+done
+echo "Done"
+
+ldconfig -p | grep libpqos >/dev/null
+if [ $? -eq 0 ]
+then
+ echo "Intel RDT library already installed. Done"
+else
+ pushd .
+
+ echo "Get intel_rdt repo and install..."
+ rm -rf intel-cmt-cat >/dev/null
+ git clone https://github.com/01org/intel-cmt-cat.git
+ pushd intel-cmt-cat
+ git checkout tags/v1.5 -b v1.5
+ make install PREFIX=/usr
+ popd
+
+ popd
+ echo "Done."
+fi
+
+which /opt/nsb_bin/collectd/collectd >/dev/null
+if [ $? -eq 0 ]
+then
+ echo "Collectd already installed. Done"
+else
+ pushd .
+ echo "Get collectd from repository and install..."
+ rm -rf collectd >/dev/null
+ git clone https://github.com/collectd/collectd.git
+ pushd collectd
+ git stash
+ git checkout -b collectd 43a4db3b3209f497a0ba408aebf8aee385c6262d
+ ./build.sh
+ ./configure --with-libpqos=/usr/
+ make install > /dev/null
+ popd
+ echo "Done."
+ popd
+fi
+
+modprobe msr
+cp $INSTALL_NSB_BIN/collectd.conf /opt/collectd/etc/
+
+echo "Check if admin user already created"
+rabbitmqctl list_users | grep '^admin$' > /dev/null
+if [ $? -eq 0 ];
+then
+ echo "'admin' user already created..."
+else
+ echo "Creating 'admin' user for collectd data export..."
+ rabbitmqctl delete_user guest
+ rabbitmqctl add_user admin admin
+ rabbitmqctl authenticate_user admin admin
+ rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
+ echo "Done"
+fi
diff --git a/yardstick/network_services/nfvi/resource.py b/yardstick/network_services/nfvi/resource.py
new file mode 100644
index 000000000..d71e1e995
--- /dev/null
+++ b/yardstick/network_services/nfvi/resource.py
@@ -0,0 +1,162 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" Resource collection definitions """
+
+from __future__ import absolute_import
+import logging
+import os.path
+import re
+import multiprocessing
+from oslo_config import cfg
+
+from yardstick import ssh
+from yardstick.network_services.nfvi.collectd import AmqpConsumer
+from yardstick.network_services.utils import provision_tool
+
+CONF = cfg.CONF
+ZMQ_OVS_PORT = 5567
+ZMQ_POLLING_TIME = 12000
+
+
+class ResourceProfile(object):
+ """
+ This profile adds a resource at the beginning of the test session
+ """
+
+ def __init__(self, vnfd, cores):
+ self.enable = True
+ self.connection = None
+ self.cores = cores
+
+ mgmt_interface = vnfd.get("mgmt-interface")
+ user = mgmt_interface.get("user")
+ passwd = mgmt_interface.get("password")
+ ip_addr = mgmt_interface.get("ip")
+ self.vnfip = mgmt_interface.get("host", ip_addr)
+ ssh_port = mgmt_interface.get("ssh_port", ssh.DEFAULT_PORT)
+ self.connection = ssh.SSH(user, self.vnfip,
+ password=passwd, port=ssh_port)
+ self.connection.wait()
+
+ def check_if_sa_running(self, process):
+ """ verify if system agent is running """
+ err, pid, _ = self.connection.execute("pgrep -f %s" % process)
+ return [err == 0, pid]
+
+ def run_collectd_amqp(self, queue):
+ """ run amqp consumer to collect the NFVi data """
+ amqp = \
+ AmqpConsumer('amqp://admin:admin@{}:5672/%2F'.format(self.vnfip),
+ queue)
+ try:
+ amqp.run()
+ except (AttributeError, RuntimeError, KeyboardInterrupt):
+ amqp.stop()
+
+ @classmethod
+ def get_cpu_data(cls, reskey, value):
+ """ Get cpu topology of the host """
+ pattern = r"-(\d+)"
+ if "cpufreq" in reskey[1]:
+ match = re.search(pattern, reskey[2], re.MULTILINE)
+ metric = reskey[1]
+ else:
+ match = re.search(pattern, reskey[1], re.MULTILINE)
+ metric = reskey[2]
+
+ time, val = re.split(":", value)
+ if match:
+ return [str(match.group(1)), metric, val, time]
+
+ return ["error", "Invalid", ""]
+
+ def parse_collectd_result(self, metrics, listcores):
+ """ convert collectd data into json"""
+ res = {"cpu": {}, "memory": {}}
+ testcase = ""
+
+ for key, value in metrics.items():
+ reskey = key.rsplit("/")
+ if "cpu" in reskey[1] or "intel_rdt" in reskey[1]:
+ cpu_key, name, metric, testcase = \
+ self.get_cpu_data(reskey, value)
+ if cpu_key in listcores:
+ res["cpu"].setdefault(cpu_key, {}).update({name: metric})
+ elif "memory" in reskey[1]:
+ val = re.split(":", value)[1]
+ res["memory"].update({reskey[2]: val})
+ res["timestamp"] = testcase
+
+ return res
+
+ def amqp_collect_nfvi_kpi(self, _queue=multiprocessing.Queue()):
+ """ amqp collect and return nfvi kpis """
+ try:
+ metric = {}
+ amqp_client = \
+ multiprocessing.Process(target=self.run_collectd_amqp,
+ args=(_queue,))
+ amqp_client.start()
+ amqp_client.join(7)
+ amqp_client.terminate()
+
+ while not _queue.empty():
+ metric.update(_queue.get())
+ except (AttributeError, RuntimeError, TypeError, ValueError):
+ logging.debug("Failed to get NFVi stats...")
+ msg = {}
+ else:
+ msg = self.parse_collectd_result(metric, self.cores)
+
+ return msg
+
+ @classmethod
+ def _start_collectd(cls, connection, bin_path):
+ connection.execute('pkill -9 collectd')
+ collectd = os.path.join(bin_path, "collectd.sh")
+ provision_tool(connection, collectd)
+ provision_tool(connection, os.path.join(bin_path, "collectd.conf"))
+
+ # Reset amqp queue
+ connection.execute("sudo service rabbitmq-server start")
+ connection.execute("sudo rabbitmqctl stop_app")
+ connection.execute("sudo rabbitmqctl reset")
+ connection.execute("sudo rabbitmqctl start_app")
+ connection.execute("sudo service rabbitmq-server restart")
+
+ # Run collectd
+ connection.execute(collectd)
+ connection.execute(os.path.join(bin_path, "collectd", "collectd"))
+
+ def initiate_systemagent(self, bin_path):
+ """ Start system agent for NFVi collection on host """
+ if self.enable:
+ self._start_collectd(self.connection, bin_path)
+
+ def start(self):
+ """ start nfvi collection """
+ if self.enable:
+ logging.debug("Start NVFi metric collection...")
+
+ def stop(self):
+ """ stop nfvi collection """
+ if self.enable:
+ agent = "collectd"
+ logging.debug("Stop resource monitor...")
+ status, pid = self.check_if_sa_running(agent)
+ if status:
+ self.connection.execute('kill -9 %s' % pid)
+ self.connection.execute('pkill -9 %s' % agent)
+ self.connection.execute('service rabbitmq-server stop')
+ self.connection.execute("sudo rabbitmqctl stop_app")