From 6facb2d37042ea8f189300e07dd97ba0ad6492ad Mon Sep 17 00:00:00 2001 From: "Mytnyk, VolodymyrX" Date: Wed, 8 Feb 2017 16:01:05 +0000 Subject: VES plugin: update to latest VES event schema - Updated the plugin to lates VES schema - Migrated VES plugin to python3 (with backward compatibility) - Fixed issue with CPU total calculation - Fixed VES documentation Change-Id: Ic8b0419146a9c75a48907f39adda1351f3b3bc73 Signed-off-by: Mytnyk, VolodymyrX --- .../collectd-ves-plugin/ves_plugin/ves_plugin.py | 82 +++++++++++++--------- 1 file changed, 50 insertions(+), 32 deletions(-) (limited to '3rd_party') diff --git a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py b/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py index 3e300893..1ab8f679 100644 --- a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py +++ b/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py @@ -24,7 +24,12 @@ import collectd import json import sys import base64 -import urllib2 +try: + # For Python 3.0 and later + import urllib.request as url +except ImportError: + # Fall back to Python 2's urllib2 + import urllib2 as url import socket import time from threading import Timer @@ -70,7 +75,7 @@ class Event(object): 'commonEventHeader' : obj, self.get_name() : self.get_obj() } - }) + }).encode() def get_name(): assert False, 'abstract method get_name() is not implemented' @@ -114,7 +119,12 @@ class MeasurementsForVfScaling(Event): self.concurrent_sessions = 0 self.configured_entities = 0 self.cpu_usage_array = [] - self.errors = [] + self.errors = { + "receiveDiscards" : 0, + "receiveErrors" : 0, + "transmitDiscards" : 0, + "transmitErrors" : 0 + } self.feature_usage_array = [] self.filesystem_usage_array = [] self.latency_distribution = [] @@ -240,11 +250,11 @@ class VESPlugin(object): 'UseHttps' : False, 'SendEventInterval' : 20.0, 'FunctionalRole' : 'Collectd VES Agent', - 'GuestRunning' : False + 'GuestRunning' : False, + 'ApiVersion' : 1.0 } self.__host_name = None self.__ves_timer = None - self.__event_timer_interval = 20.0 self.__lock = Lock() self.__event_id = 0 @@ -263,7 +273,7 @@ class VESPlugin(object): def start_timer(self): """Start event timer""" - self.__ves_timer = Timer(self.__event_timer_interval, self.__on_time) + self.__ves_timer = Timer(self.__plugin_config['SendEventInterval'], self.__on_time) self.__ves_timer.start() def stop_timer(self): @@ -272,30 +282,34 @@ class VESPlugin(object): def __on_time(self): """Timer thread""" - self.start_timer() self.event_timer() + self.start_timer() def event_send(self, event): """Send event to VES""" - server_url = "http{}://{}:{}/{}eventListener/v3{}".format( + server_url = "http{}://{}:{}{}/eventListener/v{}{}".format( 's' if self.__plugin_config['UseHttps'] else '', self.__plugin_config['Domain'], - int(self.__plugin_config['Port']), '{}/'.format( - '/{}'.format(self.__plugin_config['Path'])) if (len(self.__plugin_config['Path']) > 0) else '', - self.__plugin_config['Topic']) + int(self.__plugin_config['Port']), '{}'.format( + '/{}'.format(self.__plugin_config['Path']) if (len(self.__plugin_config['Path']) > 0) else ''), + int(self.__plugin_config['ApiVersion']), '{}'.format( + '/{}'.format(self.__plugin_config['Topic']) if (len(self.__plugin_config['Topic']) > 0) else '')) collectd.info('Vendor Event Listener is at: {}'.format(server_url)) credentials = base64.b64encode('{}:{}'.format( - self.__plugin_config['Username'], self.__plugin_config['Password'])) + self.__plugin_config['Username'], self.__plugin_config['Password']).encode()).decode() collectd.info('Authentication credentials are: {}'.format(credentials)) try: - request = urllib2.Request(server_url) + request = url.Request(server_url) request.add_header('Authorization', 'Basic {}'.format(credentials)) request.add_header('Content-Type', 'application/json') collectd.debug("Sending {} to {}".format(event.get_json(), server_url)) - vel = urllib2.urlopen(request, event.get_json(), timeout=1) - except urllib2.HTTPError as e: + vel = url.urlopen(request, event.get_json(), timeout=1) + collectd.debug("Sent data to {} successfully".format(server_url)) + except url.HTTPError as e: collectd.error('Vendor Event Listener exception: {}'.format(e)) - except urllib2.URLError as e: + except url.URLError as e: collectd.error('Vendor Event Listener is is not reachable: {}'.format(e)) + except: + collectd.error('Vendor Event Listener unknown error') def bytes_to_gb(self, bytes): """Convert bytes to GB""" @@ -388,23 +402,24 @@ class VESPlugin(object): reporting_entity = '{}-{}-{}'.format(self.get_hostname(), 'virt', vm_name) measurement.reporting_entity_id = reporting_entity measurement.reporting_entity_name = reporting_entity + # virt_vcpu + virt_vcpus = self.cache_get_value(plugin_instance=vm_name, + plugin_name='virt', type_name='virt_vcpu') + if len(virt_vcpus) > 0: + for virt_vcpu in virt_vcpus: + cpu_usage = self.cpu_ns_to_percentage(virt_vcpu) + measurement.add_cpu_usage(virt_vcpu['type_instance'], cpu_usage) # virt_cpu_total virt_vcpu_total = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', type_name='virt_cpu_total') if len(virt_vcpu_total) > 0: - measurement.aggregate_cpu_usage = self.cpu_ns_to_percentage(virt_vcpu_total[0]) + average_cpu_usage = self.cpu_ns_to_percentage(virt_vcpu_total[0]) / len(virt_vcpus) + measurement.aggregate_cpu_usage = average_cpu_usage # set source as a host for virt_vcpu_total value measurement.source_id = virt_vcpu_total[0]['host'] measurement.source_name = measurement.source_id # fill out EpochMicrosec (convert to us) measurement.start_epoch_microsec = (virt_vcpu_total[0]['time'] * 1000000) - # virt_vcp - virt_vcpus = self.cache_get_value(plugin_instance=vm_name, - plugin_name='virt', type_name='virt_vcpu') - if len(virt_vcpus) > 0: - for virt_vcpu in virt_vcpus: - cpu_usage = self.cpu_ns_to_percentage(virt_vcpu) - measurement.add_cpu_usage(virt_vcpu['type_instance'], cpu_usage) # plugin interval measurement.measurement_interval = self.__plugin_data_cache['virt']['interval'] # memory-total @@ -456,7 +471,7 @@ class VESPlugin(object): '-{}'.format(val['type_instance']) if len(val['type_instance']) else '') mgroup = MeasurementGroup(mgroup_name) ds = collectd.get_dataset(val['type']) - for index in xrange(len(ds)): + for index in range(len(ds)): mname = '{}-{}'.format(val['type'], ds[index][0]) mgroup.add_measurement(mname, str(val['values'][index])) measurement.add_measurement_group(mgroup); @@ -476,6 +491,10 @@ class VESPlugin(object): pre_total_time, pre_total, total_time, total, round(percent, 2))) return round(percent, 2) + def make_dash_string(self, *args): + """Join non empty strings with dash symbol""" + return '-'.join(filter(lambda x: len(x) > 0, args)) + def config(self, config): """Collectd config callback""" for child in config.children: @@ -485,8 +504,9 @@ class VESPlugin(object): raise RuntimeError('Configuration key name error') # check the config entry value type if len(child.values) == 0 or type(child.values[0]) != type(self.__plugin_config[child.key]): - collectd.error("Key '{}' value type should be {}".format( - child.key, str(type(self.__plugin_config[child.key])))) + collectd.error("Key '{}' value type '{}' should be {}".format( + child.key, str(type(child.values[0])), + str(type(self.__plugin_config[child.key])))) raise RuntimeError('Configuration key value error') # store the value in configuration self.__plugin_config[child.key] = child.values[0] @@ -505,7 +525,7 @@ class VESPlugin(object): if vl.plugin not in self.__plugin_data_cache: self.__plugin_data_cache[vl.plugin] = {'vls': []} plugin_vl = self.__plugin_data_cache[vl.plugin]['vls'] - for index in xrange(len(plugin_vl)): + for index in range(len(plugin_vl)): # record found, so just update time the values if (plugin_vl[index]['plugin_instance'] == vl.plugin_instance) and (plugin_vl[index]['type_instance'] == @@ -593,10 +613,8 @@ class VESPlugin(object): fault.last_epoch_micro_sec = fault.start_epoch_microsec # fill out fault header fault.event_severity = collectd_event_severity_map[n.severity] - fault.specific_problem = '{}{}'.format('{}-'.format(n.plugin_instance - if len(n.plugin_instance) else ''), n.type_instance) - fault.alarm_interface_a = '{}{}'.format(n.plugin, '-{}'.format( - n.plugin_instance if len(n.plugin_instance) else '')) + fault.specific_problem = self.make_dash_string(n.plugin_instance, n.type_instance) + fault.alarm_interface_a = self.make_dash_string(n.plugin, n.plugin_instance) fault.event_source_type = 'virtualMachine(8)' if self.__plugin_config['GuestRunning'] else 'host(3)' fault.alarm_condition = n.message self.event_send(fault) -- cgit 1.2.3-korg