From 6facb2d37042ea8f189300e07dd97ba0ad6492ad Mon Sep 17 00:00:00 2001 From: "Mytnyk, VolodymyrX" Date: Wed, 8 Feb 2017 16:01:05 +0000 Subject: VES plugin: update to latest VES event schema - Updated the plugin to lates VES schema - Migrated VES plugin to python3 (with backward compatibility) - Fixed issue with CPU total calculation - Fixed VES documentation Change-Id: Ic8b0419146a9c75a48907f39adda1351f3b3bc73 Signed-off-by: Mytnyk, VolodymyrX --- .../collectd-ves-plugin/ves_plugin/ves_plugin.py | 82 +++++++++++++--------- docs/release/userguide/collectd.ves.userguide.rst | 41 +++++++---- 2 files changed, 76 insertions(+), 47 deletions(-) diff --git a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py b/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py index 3e300893..1ab8f679 100644 --- a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py +++ b/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py @@ -24,7 +24,12 @@ import collectd import json import sys import base64 -import urllib2 +try: + # For Python 3.0 and later + import urllib.request as url +except ImportError: + # Fall back to Python 2's urllib2 + import urllib2 as url import socket import time from threading import Timer @@ -70,7 +75,7 @@ class Event(object): 'commonEventHeader' : obj, self.get_name() : self.get_obj() } - }) + }).encode() def get_name(): assert False, 'abstract method get_name() is not implemented' @@ -114,7 +119,12 @@ class MeasurementsForVfScaling(Event): self.concurrent_sessions = 0 self.configured_entities = 0 self.cpu_usage_array = [] - self.errors = [] + self.errors = { + "receiveDiscards" : 0, + "receiveErrors" : 0, + "transmitDiscards" : 0, + "transmitErrors" : 0 + } self.feature_usage_array = [] self.filesystem_usage_array = [] self.latency_distribution = [] @@ -240,11 +250,11 @@ class VESPlugin(object): 'UseHttps' : False, 'SendEventInterval' : 20.0, 'FunctionalRole' : 'Collectd VES Agent', - 'GuestRunning' : False + 'GuestRunning' : False, + 'ApiVersion' : 1.0 } self.__host_name = None self.__ves_timer = None - self.__event_timer_interval = 20.0 self.__lock = Lock() self.__event_id = 0 @@ -263,7 +273,7 @@ class VESPlugin(object): def start_timer(self): """Start event timer""" - self.__ves_timer = Timer(self.__event_timer_interval, self.__on_time) + self.__ves_timer = Timer(self.__plugin_config['SendEventInterval'], self.__on_time) self.__ves_timer.start() def stop_timer(self): @@ -272,30 +282,34 @@ class VESPlugin(object): def __on_time(self): """Timer thread""" - self.start_timer() self.event_timer() + self.start_timer() def event_send(self, event): """Send event to VES""" - server_url = "http{}://{}:{}/{}eventListener/v3{}".format( + server_url = "http{}://{}:{}{}/eventListener/v{}{}".format( 's' if self.__plugin_config['UseHttps'] else '', self.__plugin_config['Domain'], - int(self.__plugin_config['Port']), '{}/'.format( - '/{}'.format(self.__plugin_config['Path'])) if (len(self.__plugin_config['Path']) > 0) else '', - self.__plugin_config['Topic']) + int(self.__plugin_config['Port']), '{}'.format( + '/{}'.format(self.__plugin_config['Path']) if (len(self.__plugin_config['Path']) > 0) else ''), + int(self.__plugin_config['ApiVersion']), '{}'.format( + '/{}'.format(self.__plugin_config['Topic']) if (len(self.__plugin_config['Topic']) > 0) else '')) collectd.info('Vendor Event Listener is at: {}'.format(server_url)) credentials = base64.b64encode('{}:{}'.format( - self.__plugin_config['Username'], self.__plugin_config['Password'])) + self.__plugin_config['Username'], self.__plugin_config['Password']).encode()).decode() collectd.info('Authentication credentials are: {}'.format(credentials)) try: - request = urllib2.Request(server_url) + request = url.Request(server_url) request.add_header('Authorization', 'Basic {}'.format(credentials)) request.add_header('Content-Type', 'application/json') collectd.debug("Sending {} to {}".format(event.get_json(), server_url)) - vel = urllib2.urlopen(request, event.get_json(), timeout=1) - except urllib2.HTTPError as e: + vel = url.urlopen(request, event.get_json(), timeout=1) + collectd.debug("Sent data to {} successfully".format(server_url)) + except url.HTTPError as e: collectd.error('Vendor Event Listener exception: {}'.format(e)) - except urllib2.URLError as e: + except url.URLError as e: collectd.error('Vendor Event Listener is is not reachable: {}'.format(e)) + except: + collectd.error('Vendor Event Listener unknown error') def bytes_to_gb(self, bytes): """Convert bytes to GB""" @@ -388,23 +402,24 @@ class VESPlugin(object): reporting_entity = '{}-{}-{}'.format(self.get_hostname(), 'virt', vm_name) measurement.reporting_entity_id = reporting_entity measurement.reporting_entity_name = reporting_entity + # virt_vcpu + virt_vcpus = self.cache_get_value(plugin_instance=vm_name, + plugin_name='virt', type_name='virt_vcpu') + if len(virt_vcpus) > 0: + for virt_vcpu in virt_vcpus: + cpu_usage = self.cpu_ns_to_percentage(virt_vcpu) + measurement.add_cpu_usage(virt_vcpu['type_instance'], cpu_usage) # virt_cpu_total virt_vcpu_total = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', type_name='virt_cpu_total') if len(virt_vcpu_total) > 0: - measurement.aggregate_cpu_usage = self.cpu_ns_to_percentage(virt_vcpu_total[0]) + average_cpu_usage = self.cpu_ns_to_percentage(virt_vcpu_total[0]) / len(virt_vcpus) + measurement.aggregate_cpu_usage = average_cpu_usage # set source as a host for virt_vcpu_total value measurement.source_id = virt_vcpu_total[0]['host'] measurement.source_name = measurement.source_id # fill out EpochMicrosec (convert to us) measurement.start_epoch_microsec = (virt_vcpu_total[0]['time'] * 1000000) - # virt_vcp - virt_vcpus = self.cache_get_value(plugin_instance=vm_name, - plugin_name='virt', type_name='virt_vcpu') - if len(virt_vcpus) > 0: - for virt_vcpu in virt_vcpus: - cpu_usage = self.cpu_ns_to_percentage(virt_vcpu) - measurement.add_cpu_usage(virt_vcpu['type_instance'], cpu_usage) # plugin interval measurement.measurement_interval = self.__plugin_data_cache['virt']['interval'] # memory-total @@ -456,7 +471,7 @@ class VESPlugin(object): '-{}'.format(val['type_instance']) if len(val['type_instance']) else '') mgroup = MeasurementGroup(mgroup_name) ds = collectd.get_dataset(val['type']) - for index in xrange(len(ds)): + for index in range(len(ds)): mname = '{}-{}'.format(val['type'], ds[index][0]) mgroup.add_measurement(mname, str(val['values'][index])) measurement.add_measurement_group(mgroup); @@ -476,6 +491,10 @@ class VESPlugin(object): pre_total_time, pre_total, total_time, total, round(percent, 2))) return round(percent, 2) + def make_dash_string(self, *args): + """Join non empty strings with dash symbol""" + return '-'.join(filter(lambda x: len(x) > 0, args)) + def config(self, config): """Collectd config callback""" for child in config.children: @@ -485,8 +504,9 @@ class VESPlugin(object): raise RuntimeError('Configuration key name error') # check the config entry value type if len(child.values) == 0 or type(child.values[0]) != type(self.__plugin_config[child.key]): - collectd.error("Key '{}' value type should be {}".format( - child.key, str(type(self.__plugin_config[child.key])))) + collectd.error("Key '{}' value type '{}' should be {}".format( + child.key, str(type(child.values[0])), + str(type(self.__plugin_config[child.key])))) raise RuntimeError('Configuration key value error') # store the value in configuration self.__plugin_config[child.key] = child.values[0] @@ -505,7 +525,7 @@ class VESPlugin(object): if vl.plugin not in self.__plugin_data_cache: self.__plugin_data_cache[vl.plugin] = {'vls': []} plugin_vl = self.__plugin_data_cache[vl.plugin]['vls'] - for index in xrange(len(plugin_vl)): + for index in range(len(plugin_vl)): # record found, so just update time the values if (plugin_vl[index]['plugin_instance'] == vl.plugin_instance) and (plugin_vl[index]['type_instance'] == @@ -593,10 +613,8 @@ class VESPlugin(object): fault.last_epoch_micro_sec = fault.start_epoch_microsec # fill out fault header fault.event_severity = collectd_event_severity_map[n.severity] - fault.specific_problem = '{}{}'.format('{}-'.format(n.plugin_instance - if len(n.plugin_instance) else ''), n.type_instance) - fault.alarm_interface_a = '{}{}'.format(n.plugin, '-{}'.format( - n.plugin_instance if len(n.plugin_instance) else '')) + fault.specific_problem = self.make_dash_string(n.plugin_instance, n.type_instance) + fault.alarm_interface_a = self.make_dash_string(n.plugin, n.plugin_instance) fault.event_source_type = 'virtualMachine(8)' if self.__plugin_config['GuestRunning'] else 'host(3)' fault.alarm_condition = n.message self.event_send(fault) diff --git a/docs/release/userguide/collectd.ves.userguide.rst b/docs/release/userguide/collectd.ves.userguide.rst index 3cf26004..7ed6be9e 100644 --- a/docs/release/userguide/collectd.ves.userguide.rst +++ b/docs/release/userguide/collectd.ves.userguide.rst @@ -61,36 +61,42 @@ REST resources are of the form: {ServerRoot}/eventListener/v{apiVersion}/{topicName}` {ServerRoot}/eventListener/v{apiVersion}/eventBatch` - **Domain** *"host"* -* VES domain name. It can be IP address or hostname of VES collector -(default: `127.0.0.1`) + VES domain name. It can be IP address or hostname of VES collector + (default: `127.0.0.1`) **Port** *port* -* VES port (default: `30000`) + VES port (default: `30000`) **Path** *"path"* -* Used as the "optionalRoutingPath" element in the REST path (default: `empty`) + Used as the "optionalRoutingPath" element in the REST path (default: `empty`) **Topic** *"path"* -* Used as the "topicName" element in the REST path (default: `empty`) + Used as the "topicName" element in the REST path (default: `empty`) **UseHttps** *true|false* -* Allow plugin to use HTTPS instead of HTTP (default: `false`) + Allow plugin to use HTTPS instead of HTTP (default: `false`) **Username** *"username"* -* VES collector user name (default: `empty`) + VES collector user name (default: `empty`) **Password** *"passwd"* -* VES collector password (default: `empty`) + VES collector password (default: `empty`) **FunctionalRole** *"role"* -* Used as the 'functionalRole' field of 'commonEventHeader' event (default: -`Collectd VES Agent`) + Used as the 'functionalRole' field of 'commonEventHeader' event (default: + `Collectd VES Agent`) **GuestRunning** *true|false* -* This option is used if the collectd is running on a guest machine, e.g this -option should be set to `true` in this case. Defaults to `false`. + This option is used if the collectd is running on a guest machine, e.g this + option should be set to `true` in this case. Defaults to `false`. + +**SendEventInterval** *interval* + This configuration option controls how often (sec) collectd data is sent to + Vendor Event Listener (default: `20`) + +**ApiVersion** *version* + Used as the "apiVersion" element in the REST path (default: `1`) Other collectd.conf configurations ---------------------------------- @@ -100,7 +106,6 @@ Please ensure that FQDNLookup is set to false FQDNLookup false - Please ensure that the virt plugin is enabled and configured as follows. This configuration is is required only on a host side ('GuestRunning' = false). @@ -125,7 +130,11 @@ Please ensure that the cpu plugin is enabled and configured as follows ValuesPercentage true +**Note**: The `ReportByCpu` option should be set to `true` (default) if VES pugin +is running on guest machine ('GuestRunning' = true). + Please ensure that the aggregation plugin is enabled and configured as follows +(required if 'GuestRunning' = true) .. code:: bash @@ -146,7 +155,9 @@ If plugin is running on a guest side, it is important to enable uuid plugin too. In this case the hostname in event message will be represented as UUID instead of system host name. -LoadPlugin uuid +.. code:: bash + + LoadPlugin uuid If custom UUID needs to be provided, the following configuration is required in collectd.conf file: -- cgit 1.2.3-korg