summaryrefslogtreecommitdiffstats
path: root/3rd_party/collectd-ves-plugin/ves_plugin
diff options
context:
space:
mode:
authorMytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com>2017-02-08 16:01:05 +0000
committerMytnyk, Volodymyr <volodymyrx.mytnyk@intel.com>2017-05-18 17:11:00 +0100
commit6facb2d37042ea8f189300e07dd97ba0ad6492ad (patch)
treed86359cd27699e09c59726560249a501684a218c /3rd_party/collectd-ves-plugin/ves_plugin
parentcfd433e3367b3df607550e95b6110e431d9a6a4c (diff)
VES plugin: update to latest VES event schema
- Updated the plugin to lates VES schema - Migrated VES plugin to python3 (with backward compatibility) - Fixed issue with CPU total calculation - Fixed VES documentation Change-Id: Ic8b0419146a9c75a48907f39adda1351f3b3bc73 Signed-off-by: Mytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com>
Diffstat (limited to '3rd_party/collectd-ves-plugin/ves_plugin')
-rw-r--r--3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py82
1 files changed, 50 insertions, 32 deletions
diff --git a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py b/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py
index 3e300893..1ab8f679 100644
--- a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py
+++ b/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py
@@ -24,7 +24,12 @@ import collectd
import json
import sys
import base64
-import urllib2
+try:
+ # For Python 3.0 and later
+ import urllib.request as url
+except ImportError:
+ # Fall back to Python 2's urllib2
+ import urllib2 as url
import socket
import time
from threading import Timer
@@ -70,7 +75,7 @@ class Event(object):
'commonEventHeader' : obj,
self.get_name() : self.get_obj()
}
- })
+ }).encode()
def get_name():
assert False, 'abstract method get_name() is not implemented'
@@ -114,7 +119,12 @@ class MeasurementsForVfScaling(Event):
self.concurrent_sessions = 0
self.configured_entities = 0
self.cpu_usage_array = []
- self.errors = []
+ self.errors = {
+ "receiveDiscards" : 0,
+ "receiveErrors" : 0,
+ "transmitDiscards" : 0,
+ "transmitErrors" : 0
+ }
self.feature_usage_array = []
self.filesystem_usage_array = []
self.latency_distribution = []
@@ -240,11 +250,11 @@ class VESPlugin(object):
'UseHttps' : False,
'SendEventInterval' : 20.0,
'FunctionalRole' : 'Collectd VES Agent',
- 'GuestRunning' : False
+ 'GuestRunning' : False,
+ 'ApiVersion' : 1.0
}
self.__host_name = None
self.__ves_timer = None
- self.__event_timer_interval = 20.0
self.__lock = Lock()
self.__event_id = 0
@@ -263,7 +273,7 @@ class VESPlugin(object):
def start_timer(self):
"""Start event timer"""
- self.__ves_timer = Timer(self.__event_timer_interval, self.__on_time)
+ self.__ves_timer = Timer(self.__plugin_config['SendEventInterval'], self.__on_time)
self.__ves_timer.start()
def stop_timer(self):
@@ -272,30 +282,34 @@ class VESPlugin(object):
def __on_time(self):
"""Timer thread"""
- self.start_timer()
self.event_timer()
+ self.start_timer()
def event_send(self, event):
"""Send event to VES"""
- server_url = "http{}://{}:{}/{}eventListener/v3{}".format(
+ server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
's' if self.__plugin_config['UseHttps'] else '', self.__plugin_config['Domain'],
- int(self.__plugin_config['Port']), '{}/'.format(
- '/{}'.format(self.__plugin_config['Path'])) if (len(self.__plugin_config['Path']) > 0) else '',
- self.__plugin_config['Topic'])
+ int(self.__plugin_config['Port']), '{}'.format(
+ '/{}'.format(self.__plugin_config['Path']) if (len(self.__plugin_config['Path']) > 0) else ''),
+ int(self.__plugin_config['ApiVersion']), '{}'.format(
+ '/{}'.format(self.__plugin_config['Topic']) if (len(self.__plugin_config['Topic']) > 0) else ''))
collectd.info('Vendor Event Listener is at: {}'.format(server_url))
credentials = base64.b64encode('{}:{}'.format(
- self.__plugin_config['Username'], self.__plugin_config['Password']))
+ self.__plugin_config['Username'], self.__plugin_config['Password']).encode()).decode()
collectd.info('Authentication credentials are: {}'.format(credentials))
try:
- request = urllib2.Request(server_url)
+ request = url.Request(server_url)
request.add_header('Authorization', 'Basic {}'.format(credentials))
request.add_header('Content-Type', 'application/json')
collectd.debug("Sending {} to {}".format(event.get_json(), server_url))
- vel = urllib2.urlopen(request, event.get_json(), timeout=1)
- except urllib2.HTTPError as e:
+ vel = url.urlopen(request, event.get_json(), timeout=1)
+ collectd.debug("Sent data to {} successfully".format(server_url))
+ except url.HTTPError as e:
collectd.error('Vendor Event Listener exception: {}'.format(e))
- except urllib2.URLError as e:
+ except url.URLError as e:
collectd.error('Vendor Event Listener is is not reachable: {}'.format(e))
+ except:
+ collectd.error('Vendor Event Listener unknown error')
def bytes_to_gb(self, bytes):
"""Convert bytes to GB"""
@@ -388,23 +402,24 @@ class VESPlugin(object):
reporting_entity = '{}-{}-{}'.format(self.get_hostname(), 'virt', vm_name)
measurement.reporting_entity_id = reporting_entity
measurement.reporting_entity_name = reporting_entity
+ # virt_vcpu
+ virt_vcpus = self.cache_get_value(plugin_instance=vm_name,
+ plugin_name='virt', type_name='virt_vcpu')
+ if len(virt_vcpus) > 0:
+ for virt_vcpu in virt_vcpus:
+ cpu_usage = self.cpu_ns_to_percentage(virt_vcpu)
+ measurement.add_cpu_usage(virt_vcpu['type_instance'], cpu_usage)
# virt_cpu_total
virt_vcpu_total = self.cache_get_value(plugin_instance=vm_name,
plugin_name='virt', type_name='virt_cpu_total')
if len(virt_vcpu_total) > 0:
- measurement.aggregate_cpu_usage = self.cpu_ns_to_percentage(virt_vcpu_total[0])
+ average_cpu_usage = self.cpu_ns_to_percentage(virt_vcpu_total[0]) / len(virt_vcpus)
+ measurement.aggregate_cpu_usage = average_cpu_usage
# set source as a host for virt_vcpu_total value
measurement.source_id = virt_vcpu_total[0]['host']
measurement.source_name = measurement.source_id
# fill out EpochMicrosec (convert to us)
measurement.start_epoch_microsec = (virt_vcpu_total[0]['time'] * 1000000)
- # virt_vcp
- virt_vcpus = self.cache_get_value(plugin_instance=vm_name,
- plugin_name='virt', type_name='virt_vcpu')
- if len(virt_vcpus) > 0:
- for virt_vcpu in virt_vcpus:
- cpu_usage = self.cpu_ns_to_percentage(virt_vcpu)
- measurement.add_cpu_usage(virt_vcpu['type_instance'], cpu_usage)
# plugin interval
measurement.measurement_interval = self.__plugin_data_cache['virt']['interval']
# memory-total
@@ -456,7 +471,7 @@ class VESPlugin(object):
'-{}'.format(val['type_instance']) if len(val['type_instance']) else '')
mgroup = MeasurementGroup(mgroup_name)
ds = collectd.get_dataset(val['type'])
- for index in xrange(len(ds)):
+ for index in range(len(ds)):
mname = '{}-{}'.format(val['type'], ds[index][0])
mgroup.add_measurement(mname, str(val['values'][index]))
measurement.add_measurement_group(mgroup);
@@ -476,6 +491,10 @@ class VESPlugin(object):
pre_total_time, pre_total, total_time, total, round(percent, 2)))
return round(percent, 2)
+ def make_dash_string(self, *args):
+ """Join non empty strings with dash symbol"""
+ return '-'.join(filter(lambda x: len(x) > 0, args))
+
def config(self, config):
"""Collectd config callback"""
for child in config.children:
@@ -485,8 +504,9 @@ class VESPlugin(object):
raise RuntimeError('Configuration key name error')
# check the config entry value type
if len(child.values) == 0 or type(child.values[0]) != type(self.__plugin_config[child.key]):
- collectd.error("Key '{}' value type should be {}".format(
- child.key, str(type(self.__plugin_config[child.key]))))
+ collectd.error("Key '{}' value type '{}' should be {}".format(
+ child.key, str(type(child.values[0])),
+ str(type(self.__plugin_config[child.key]))))
raise RuntimeError('Configuration key value error')
# store the value in configuration
self.__plugin_config[child.key] = child.values[0]
@@ -505,7 +525,7 @@ class VESPlugin(object):
if vl.plugin not in self.__plugin_data_cache:
self.__plugin_data_cache[vl.plugin] = {'vls': []}
plugin_vl = self.__plugin_data_cache[vl.plugin]['vls']
- for index in xrange(len(plugin_vl)):
+ for index in range(len(plugin_vl)):
# record found, so just update time the values
if (plugin_vl[index]['plugin_instance'] ==
vl.plugin_instance) and (plugin_vl[index]['type_instance'] ==
@@ -593,10 +613,8 @@ class VESPlugin(object):
fault.last_epoch_micro_sec = fault.start_epoch_microsec
# fill out fault header
fault.event_severity = collectd_event_severity_map[n.severity]
- fault.specific_problem = '{}{}'.format('{}-'.format(n.plugin_instance
- if len(n.plugin_instance) else ''), n.type_instance)
- fault.alarm_interface_a = '{}{}'.format(n.plugin, '-{}'.format(
- n.plugin_instance if len(n.plugin_instance) else ''))
+ fault.specific_problem = self.make_dash_string(n.plugin_instance, n.type_instance)
+ fault.alarm_interface_a = self.make_dash_string(n.plugin, n.plugin_instance)
fault.event_source_type = 'virtualMachine(8)' if self.__plugin_config['GuestRunning'] else 'host(3)'
fault.alarm_condition = n.message
self.event_send(fault)