diff options
Diffstat (limited to '3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py')
-rw-r--r-- | 3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py | 592 |
1 files changed, 302 insertions, 290 deletions
diff --git a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py b/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py index ee27bcdf..4c313cee 100644 --- a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py +++ b/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py @@ -1,29 +1,27 @@ -# MIT License +#!/usr/bin/env python # -# Copyright(c) 2016-2017 Intel Corporation. All rights reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the "Software"), -# to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, -# and/or sell copies of the Software, and to permit persons to whom the -# Software is furnished to do so, subject to the following conditions: +# http://www.apache.org/licenses/LICENSE-2.0 # -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -# DEALINGS IN THE SOFTWARE. - -import collectd +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import json import sys import base64 +import ConfigParser +import logging +import argparse + +from distutils.util import strtobool +from kafka import KafkaConsumer + try: # For Python 3.0 and later import urllib.request as url @@ -35,13 +33,14 @@ import time from threading import Timer from threading import Lock + class Event(object): """Event header""" def __init__(self): """Construct the common header""" self.version = 2.0 - self.event_type = "Info" # use "Info" unless a notification is generated + self.event_type = "Info" # use "Info" unless a notification is generated self.domain = "" self.event_id = "" self.source_id = "" @@ -49,7 +48,7 @@ class Event(object): self.functional_role = "" self.reporting_entity_id = "" self.reporting_entity_name = "" - self.priority = "Normal" # will be derived from event if there is one + self.priority = "Normal" # will be derived from event if there is one self.start_epoch_microsec = 0 self.last_epoch_micro_sec = 0 self.sequence = 0 @@ -78,9 +77,9 @@ class Event(object): obj['nfcNamingCode'] = self.nfc_naming_code obj['nfNamingCode'] = self.nf_naming_code return json.dumps({ - 'event' : { - 'commonEventHeader' : obj, - self.get_name() : self.get_obj() + 'event': { + 'commonEventHeader': obj, + self.get_name(): self.get_obj() } }).encode() @@ -90,6 +89,7 @@ class Event(object): def get_obj(): assert False, 'abstract method get_obj() is not implemented' + class Field(object): """field datatype""" @@ -99,10 +99,11 @@ class Field(object): def get_obj(self): return { - 'name' : self.name, - 'value' : self.value + 'name': self.name, + 'value': self.value } + class NamedArrayOfFields(object): """namedArrayOfFields datatype""" @@ -115,10 +116,11 @@ class NamedArrayOfFields(object): def get_obj(self): return { - 'name' : self.name, - 'arrayOfFields' : self.array_of_fields + 'name': self.name, + 'arrayOfFields': self.array_of_fields } + class VESDataType(object): """ Base VES datatype """ @@ -126,56 +128,57 @@ class VESDataType(object): if val is not None: obj[key] = val + class DiskUsage(VESDataType): """diskUsage datatype""" def __init__(self, identifier): - self.disk_identifier = identifier - self.disk_io_time_avg = None - self.disk_io_time_last = None - self.disk_io_time_max = None - self.disk_io_time_min = None - self.disk_merged_read_avg = None - self.disk_merged_read_last = None - self.disk_merged_read_max = None - self.disk_merged_read_min = None - self.disk_merged_write_avg = None - self.disk_merged_write_last = None - self.disk_merged_write_max = None - self.disk_merged_write_min = None - self.disk_octets_read_avg = None - self.disk_octets_read_last = None - self.disk_octets_read_max = None - self.disk_octets_read_min = None - self.disk_octets_write_avg = None - self.disk_octets_write_last = None - self.disk_octets_write_max = None - self.disk_octets_write_min = None - self.disk_ops_read_avg = None - self.disk_ops_read_last = None - self.disk_ops_read_max = None - self.disk_ops_read_min = None - self.disk_ops_write_avg = None - self.disk_ops_write_last = None - self.disk_ops_write_max = None - self.disk_ops_write_min = None - self.disk_pending_operations_avg = None - self.disk_pending_operations_last = None - self.disk_pending_operations_max = None - self.disk_pending_operations_min = None - self.disk_time_read_avg = None - self.disk_time_read_last = None - self.disk_time_read_max = None - self.disk_time_read_min = None - self.disk_time_write_avg = None - self.disk_time_write_last = None - self.disk_time_write_max = None - self.disk_time_write_min = None + self.disk_identifier = identifier + self.disk_io_time_avg = None + self.disk_io_time_last = None + self.disk_io_time_max = None + self.disk_io_time_min = None + self.disk_merged_read_avg = None + self.disk_merged_read_last = None + self.disk_merged_read_max = None + self.disk_merged_read_min = None + self.disk_merged_write_avg = None + self.disk_merged_write_last = None + self.disk_merged_write_max = None + self.disk_merged_write_min = None + self.disk_octets_read_avg = None + self.disk_octets_read_last = None + self.disk_octets_read_max = None + self.disk_octets_read_min = None + self.disk_octets_write_avg = None + self.disk_octets_write_last = None + self.disk_octets_write_max = None + self.disk_octets_write_min = None + self.disk_ops_read_avg = None + self.disk_ops_read_last = None + self.disk_ops_read_max = None + self.disk_ops_read_min = None + self.disk_ops_write_avg = None + self.disk_ops_write_last = None + self.disk_ops_write_max = None + self.disk_ops_write_min = None + self.disk_pending_operations_avg = None + self.disk_pending_operations_last = None + self.disk_pending_operations_max = None + self.disk_pending_operations_min = None + self.disk_time_read_avg = None + self.disk_time_read_last = None + self.disk_time_read_max = None + self.disk_time_read_min = None + self.disk_time_write_avg = None + self.disk_time_write_last = None + self.disk_time_write_max = None + self.disk_time_write_min = None def get_obj(self): obj = { # required - 'diskIdentifier' : self.disk_identifier + 'diskIdentifier': self.disk_identifier } self.set_optional(obj, 'diskIoTimeAvg', self.disk_io_time_avg) self.set_optional(obj, 'diskIoTimeLast', self.disk_io_time_last) @@ -219,46 +222,47 @@ class DiskUsage(VESDataType): self.set_optional(obj, 'diskTimeWriteMin', self.disk_time_write_min) return obj + class VNicPerformance(VESDataType): """vNicPerformance datatype""" def __init__(self, identifier): - self.received_broadcast_packets_accumulated = None - self.received_broadcast_packets_delta = None - self.received_discarded_packets_accumulated = None - self.received_discarded_packets_delta = None - self.received_error_packets_accumulated = None - self.received_error_packets_delta = None - self.received_multicast_packets_accumulated = None - self.received_multicast_packets_delta = None - self.received_octets_accumulated = None - self.received_octets_delta = None - self.received_total_packets_accumulated = None - self.received_total_packets_delta = None - self.received_unicast_packets_accumulated = None - self.received_unicast_packets_delta = None - self.transmitted_broadcast_packets_accumulated = None - self.transmitted_broadcast_packets_delta = None - self.transmitted_discarded_packets_accumulated = None - self.transmitted_discarded_packets_delta = None - self.transmitted_error_packets_accumulated = None - self.transmitted_error_packets_delta = None - self.transmitted_multicast_packets_accumulated = None - self.transmitted_multicast_packets_delta = None - self.transmitted_octets_accumulated = None - self.transmitted_octets_delta = None - self.transmitted_total_packets_accumulated = None - self.transmitted_total_packets_delta = None - self.transmitted_unicast_packets_accumulated = None - self.transmitted_unicast_packets_delta = None - self.values_are_suspect = 'true' - self.v_nic_identifier = identifier + self.received_broadcast_packets_accumulated = None + self.received_broadcast_packets_delta = None + self.received_discarded_packets_accumulated = None + self.received_discarded_packets_delta = None + self.received_error_packets_accumulated = None + self.received_error_packets_delta = None + self.received_multicast_packets_accumulated = None + self.received_multicast_packets_delta = None + self.received_octets_accumulated = None + self.received_octets_delta = None + self.received_total_packets_accumulated = None + self.received_total_packets_delta = None + self.received_unicast_packets_accumulated = None + self.received_unicast_packets_delta = None + self.transmitted_broadcast_packets_accumulated = None + self.transmitted_broadcast_packets_delta = None + self.transmitted_discarded_packets_accumulated = None + self.transmitted_discarded_packets_delta = None + self.transmitted_error_packets_accumulated = None + self.transmitted_error_packets_delta = None + self.transmitted_multicast_packets_accumulated = None + self.transmitted_multicast_packets_delta = None + self.transmitted_octets_accumulated = None + self.transmitted_octets_delta = None + self.transmitted_total_packets_accumulated = None + self.transmitted_total_packets_delta = None + self.transmitted_unicast_packets_accumulated = None + self.transmitted_unicast_packets_delta = None + self.values_are_suspect = 'true' + self.v_nic_identifier = identifier def get_obj(self): obj = { # required - 'valuesAreSuspect' : self.values_are_suspect, - 'vNicIdentifier' : self.v_nic_identifier + 'valuesAreSuspect': self.values_are_suspect, + 'vNicIdentifier': self.v_nic_identifier } # optional self.set_optional(obj, 'receivedBroadcastPacketsAccumulated', self.received_broadcast_packets_accumulated) @@ -291,6 +295,7 @@ class VNicPerformance(VESDataType): self.set_optional(obj, 'transmittedUnicastPacketsDelta', self.transmitted_unicast_packets_delta) return obj + class CpuUsage(VESDataType): """cpuUsage datatype""" @@ -309,8 +314,8 @@ class CpuUsage(VESDataType): def get_obj(self): obj = { # required - 'cpuIdentifier' : self.cpu_identifier, - 'percentUsage' : self.percent_usage + 'cpuIdentifier': self.cpu_identifier, + 'percentUsage': self.percent_usage } # optional self.set_optional(obj, 'cpuIdle', self.cpu_idle) @@ -323,6 +328,7 @@ class CpuUsage(VESDataType): self.set_optional(obj, 'cpuWait', self.cpu_wait) return obj + class MemoryUsage(VESDataType): """memoryUsage datatype""" @@ -338,14 +344,14 @@ class MemoryUsage(VESDataType): def __str__(self): """ for debug purposes """ - return 'vm_identifier : {vm_identifier}\nbuffered : {buffered}\n'\ - 'cached : {cached}\nconfigured : {configured}\nfree : {free}\n'\ - 'slab_recl : {slab_recl}\nslab_unrecl : {slab_unrecl}\n'\ - 'used : {used}\n'.format(buffered = self.memory_buffered, - cached = self.memory_cached, configured = self.memory_configured, - free = self.memory_free, slab_recl = self.memory_slab_recl, - slab_unrecl = self.memory_slab_unrecl, used = self.memory_used, - vm_identifier = self.vm_identifier) + return 'vm_identifier : {vm_identifier}\nbuffered : {buffered}\n' \ + 'cached : {cached}\nconfigured : {configured}\nfree : {free}\n' \ + 'slab_recl : {slab_recl}\nslab_unrecl : {slab_unrecl}\n' \ + 'used : {used}\n'.format(buffered=self.memory_buffered, + cached=self.memory_cached, configured=self.memory_configured, + free=self.memory_free, slab_recl=self.memory_slab_recl, + slab_unrecl=self.memory_slab_unrecl, used=self.memory_used, + vm_identifier=self.vm_identifier) def get_memory_free(self): if self.memory_free is None: @@ -362,10 +368,10 @@ class MemoryUsage(VESDataType): if self.memory_used is None: # calculate the memory used if None not in (self.memory_configured, self.memory_free, self.memory_buffered, - self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl): + self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl): return self.memory_configured - (self.memory_free + - self.memory_buffered + self.memory_cached + - self.memory_slab_recl + self.memory_slab_unrecl) + self.memory_buffered + self.memory_cached + + self.memory_slab_recl + self.memory_slab_unrecl) else: # required field, so return zero return 0 @@ -376,10 +382,10 @@ class MemoryUsage(VESDataType): if self.memory_configured is None: # calculate the total memory if None not in (self.memory_used, self.memory_free, self.memory_buffered, - self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl): + self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl): return (self.memory_used + self.memory_free + - self.memory_buffered + self.memory_cached + - self.memory_slab_recl + self.memory_slab_unrecl) + self.memory_buffered + self.memory_cached + + self.memory_slab_recl + self.memory_slab_unrecl) else: return None else: @@ -388,9 +394,9 @@ class MemoryUsage(VESDataType): def get_obj(self): obj = { # required fields - 'memoryFree' : self.get_memory_free(), - 'memoryUsed' : self.get_memory_used(), - 'vmIdentifier' : self.vm_identifier + 'memoryFree': self.get_memory_free(), + 'memoryUsed': self.get_memory_used(), + 'vmIdentifier': self.vm_identifier } # optional fields self.set_optional(obj, 'memoryBuffered', self.memory_buffered) @@ -400,6 +406,7 @@ class MemoryUsage(VESDataType): self.set_optional(obj, 'memorySlabUnrecl', self.memory_slab_unrecl) return obj + class MeasurementsForVfScaling(Event): """MeasurementsForVfScaling datatype""" @@ -477,6 +484,7 @@ class MeasurementsForVfScaling(Event): """Name of datatype""" return "measurementsForVfScalingFields" + class Fault(Event): """Fault datatype""" @@ -516,29 +524,32 @@ class Fault(Event): obj['eventCategory'] = self.event_category return obj -class VESPlugin(object): - """VES plugin with collectd callbacks""" + +class VESApp(object): + """VES Application""" def __init__(self): - """Plugin initialization""" + """Application initialization""" self.__plugin_data_cache = { - 'cpu' : {'interval' : 0.0, 'vls' : []}, - 'virt' : {'interval' : 0.0, 'vls' : []}, - 'disk' : {'interval' : 0.0, 'vls' : []}, - 'interface' : {'interval' : 0.0, 'vls' : []}, - 'memory' : {'interval' : 0.0, 'vls' : []} + 'cpu': {'interval': 0.0, 'vls': []}, + 'virt': {'interval': 0.0, 'vls': []}, + 'disk': {'interval': 0.0, 'vls': []}, + 'interface': {'interval': 0.0, 'vls': []}, + 'memory': {'interval': 0.0, 'vls': []} } - self.__plugin_config = { - 'Domain' : '127.0.0.1', - 'Port' : 30000.0, - 'Path' : '', - 'Username' : '', - 'Password' : '', - 'Topic' : '', - 'UseHttps' : False, - 'SendEventInterval' : 20.0, - 'FunctionalRole' : 'Collectd VES Agent', - 'ApiVersion' : 5.1 + self.__app_config = { + 'Domain': '127.0.0.1', + 'Port': 30000, + 'Path': '', + 'Username': '', + 'Password': '', + 'Topic': '', + 'UseHttps': False, + 'SendEventInterval': 20.0, + 'FunctionalRole': 'Collectd VES Agent', + 'ApiVersion': 5.1, + 'KafkaPort': 9092, + 'KafkaBroker': 'localhost' } self.__host_name = None self.__ves_timer = None @@ -551,16 +562,16 @@ class VESPlugin(object): return str(self.__event_id) def lock(self): - """Lock the plugin""" + """Lock the application""" self.__lock.acquire() def unlock(self): - """Unlock the plugin""" + """Unlock the application""" self.__lock.release() def start_timer(self): """Start event timer""" - self.__ves_timer = Timer(self.__plugin_config['SendEventInterval'], self.__on_time) + self.__ves_timer = Timer(self.__app_config['SendEventInterval'], self.__on_time) self.__ves_timer.start() def stop_timer(self): @@ -575,36 +586,34 @@ class VESPlugin(object): def event_send(self, event): """Send event to VES""" server_url = "http{}://{}:{}{}/eventListener/v{}{}".format( - 's' if self.__plugin_config['UseHttps'] else '', self.__plugin_config['Domain'], - int(self.__plugin_config['Port']), '{}'.format( - '/{}'.format(self.__plugin_config['Path']) if (len(self.__plugin_config['Path']) > 0) else ''), - int(self.__plugin_config['ApiVersion']), '{}'.format( - '/{}'.format(self.__plugin_config['Topic']) if (len(self.__plugin_config['Topic']) > 0) else '')) - collectd.info('Vendor Event Listener is at: {}'.format(server_url)) + 's' if self.__app_config['UseHttps'] else '', self.__app_config['Domain'], + int(self.__app_config['Port']), '{}'.format( + '/{}'.format(self.__app_config['Path']) if (len(self.__app_config['Path']) > 0) else ''), + int(self.__app_config['ApiVersion']), '{}'.format( + '/{}'.format(self.__app_config['Topic']) if (len(self.__app_config['Topic']) > 0) else '')) + logging.info('Vendor Event Listener is at: {}'.format(server_url)) credentials = base64.b64encode('{}:{}'.format( - self.__plugin_config['Username'], self.__plugin_config['Password']).encode()).decode() - collectd.info('Authentication credentials are: {}'.format(credentials)) + self.__app_config['Username'], self.__app_config['Password']).encode()).decode() + logging.info('Authentication credentials are: {}'.format(credentials)) try: request = url.Request(server_url) request.add_header('Authorization', 'Basic {}'.format(credentials)) request.add_header('Content-Type', 'application/json') - collectd.debug("Sending {} to {}".format(event.get_json(), server_url)) + logging.debug("Sending {} to {}".format(event.get_json(), server_url)) vel = url.urlopen(request, event.get_json(), timeout=1) - collectd.debug("Sent data to {} successfully".format(server_url)) + logging.debug("Sent data to {} successfully".format(server_url)) except url.HTTPError as e: - collectd.error('Vendor Event Listener exception: {}'.format(e)) + logging.error('Vendor Event Listener exception: {}'.format(e)) except url.URLError as e: - collectd.error('Vendor Event Listener is is not reachable: {}'.format(e)) - except: - collectd.error('Vendor Event Listener unknown error') + logging.error('Vendor Event Listener is is not reachable: {}'.format(e)) + except Exception as e: + logging.error('Vendor Event Listener error: {}'.format(e)) def bytes_to_kb(self, bytes): """Convert bytes to kibibytes""" return round((bytes / 1024.0), 3) def get_hostname(self): - if len(self.__host_name): - return self.__host_name return socket.gethostname() def send_host_measurements(self): @@ -619,15 +628,16 @@ class VESPlugin(object): us_up_to_date = True for vm_value in vm_values: if vm_value['updated'] == False: + logging.warning("%s" % vm_value) us_up_to_date = False break if not us_up_to_date: - # one of the cache value is not up-to-date, break - collectd.warning("virt collectD cache values are not up-to-date for {}".format(vm_name)) - continue + # one of the cache value is not up-to-date, break + logging.warning("virt collectd cache values are not up-to-date for {}".format(vm_name)) + continue # values are up-to-date, create an event message measurement = MeasurementsForVfScaling(self.get_event_id()) - measurement.functional_role = self.__plugin_config['FunctionalRole'] + measurement.functional_role = self.__app_config['FunctionalRole'] # fill out reporting_entity measurement.reporting_entity_id = self.get_hostname() measurement.reporting_entity_name = measurement.reporting_entity_id @@ -643,7 +653,7 @@ class VESPlugin(object): memory_total = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', type_name='memory', type_instance='total') memory_unused = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='memory', type_instance='unused') + type_name='memory', type_instance='unused') memory_rss = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', type_name='memory', type_instance='rss') if len(memory_total) > 0: @@ -655,7 +665,7 @@ class VESPlugin(object): # since, "used" metric is not provided by virt plugn, set the rest of the memory stats # to zero to calculate used based on provided stats only mem_usage.memory_buffered = mem_usage.memory_cached = mem_usage.memory_slab_recl = \ - mem_usage.memory_slab_unrecl = 0 + mem_usage.memory_slab_unrecl = 0 measurement.add_memory_usage(mem_usage) # cpuUsage virt_vcpus = self.cache_get_value(plugin_instance=vm_name, @@ -672,9 +682,9 @@ class VESPlugin(object): if_packets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', type_name='if_packets', type_instance=if_name) if_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='if_octets', type_instance=if_name) + type_name='if_octets', type_instance=if_name) if_errors = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='if_errors', type_instance=if_name) + type_name='if_errors', type_instance=if_name) if_dropped = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', type_name='if_dropped', type_instance=if_name) v_nic_performance = VNicPerformance(if_name) @@ -695,7 +705,7 @@ class VESPlugin(object): disk_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', type_name='disk_octets', type_instance=disk_name) disk_ops = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='disk_ops', type_instance=disk_name) + type_name='disk_ops', type_instance=disk_name) disk_usage = DiskUsage(disk_name) disk_usage.disk_octets_read_last = disk_octets[0]['values'][0] disk_usage.disk_octets_write_last = disk_octets[0]['values'][1] @@ -714,8 +724,8 @@ class VESPlugin(object): # send event to the VES self.event_send(measurement) if len(vm_names) > 0: - # mark the additional measurements metrics as read - self.mark_cache_values_as_read(exclude_plugins=['virt']) + # mark the additional measurements metrics as read + self.mark_cache_values_as_read(exclude_plugins=['virt']) def event_timer(self): """Event timer thread""" @@ -746,9 +756,9 @@ class VESPlugin(object): array_name = self.make_dash_string(plugin_name, val['plugin_instance'], val['type_instance']) named_array = NamedArrayOfFields(array_name) - ds = collectd.get_dataset(val['type']) - for index in range(len(ds)): - mname = '{}-{}'.format(val['type'], ds[index][0]) + + for index in range(len(val['dsnames'])): + mname = '{}-{}'.format(val['type'], val['dsnames'][index]) named_array.add(Field(mname, str(val['values'][index]))) measurement.add_additional_measurement(named_array); val['updated'] = False @@ -763,9 +773,9 @@ class VESPlugin(object): if val['updated']: name_prefix = self.make_dash_string(plugin_name, val['plugin_instance'], val['type_instance']) - ds = collectd.get_dataset(val['type']) - for index in range(len(ds)): - field_name = self.make_dash_string(name_prefix, val['type'], ds[index][0]) + + for index in range(len(val['dsnames'])): + field_name = self.make_dash_string(name_prefix, val['type'], val['dsnames'][index]) measurement.add_additional_fields(Field(field_name, str(val['values'][index]))) def cpu_ns_to_percentage(self, vl): @@ -777,8 +787,8 @@ class VESPlugin(object): if (total_time - pre_total_time) == 0: # return zero usage if time diff is zero return 0.0 - percent = (100.0 * (total - pre_total))/((total_time - pre_total_time) * 1000000000.0) - collectd.debug("pre_time={}, pre_value={}, time={}, value={}, cpu={}%".format( + percent = (100.0 * (total - pre_total)) / ((total_time - pre_total_time) * 1000000000.0) + logging.debug("pre_time={}, pre_value={}, time={}, value={}, cpu={}%".format( pre_total_time, pre_total, total_time, total, round(percent, 2))) return round(percent, 2) @@ -787,62 +797,71 @@ class VESPlugin(object): return '-'.join(filter(lambda x: len(x) > 0, args)) def config(self, config): - """Collectd config callback""" - for child in config.children: - # check the config entry name - if child.key not in self.__plugin_config: - collectd.error("Key '{}' name is invalid".format(child.key)) - raise RuntimeError('Configuration key name error') - # check the config entry value type - if len(child.values) == 0 or type(child.values[0]) != type(self.__plugin_config[child.key]): - collectd.error("Key '{}' value type '{}' should be {}".format( - child.key, str(type(child.values[0])), - str(type(self.__plugin_config[child.key])))) - raise RuntimeError('Configuration key value error') - # store the value in configuration - self.__plugin_config[child.key] = child.values[0] + """VES option configuration""" + for key, value in config.items('config'): + if key in self.__app_config: + try: + if type(self.__app_config[key]) == int: + value = int(value) + elif type(self.__app_config[key]) == float: + value = float(value) + elif type(self.__app_config[key]) == bool: + value = bool(distutils.util.strtobool(value)) + + if isinstance(value, type(self.__app_config[key])): + self.__app_config[key] = value + else: + logging.error("Type mismatch with %s" % key) + sys.exit() + except ValueError: + logging.error("Incorrect value type for %s" % key) + sys.exit() + else: + logging.error("Incorrect key configuration %s" % key) + sys.exit() def init(self): - """Collectd init callback""" + """Initialisation""" # start the VES timer self.start_timer() - ## - # Please note, the cache should be locked before using this function - # - def update_cache_value(self, vl): - """Update value internal collectD cache values or create new one""" + def update_cache_value(self, kafka_data): + """Update value internal collectd cache values or create new one + Please note, the cache should be locked before using this function""" found = False - if vl.plugin not in self.__plugin_data_cache: - self.__plugin_data_cache[vl.plugin] = {'vls': []} - plugin_vl = self.__plugin_data_cache[vl.plugin]['vls'] + if kafka_data['plugin'] not in self.__plugin_data_cache: + self.__plugin_data_cache[kafka_data['plugin']] = {'vls': []} + plugin_vl = self.__plugin_data_cache[kafka_data['plugin']]['vls'] + for index in range(len(plugin_vl)): # record found, so just update time the values - if (plugin_vl[index]['plugin_instance'] == - vl.plugin_instance) and (plugin_vl[index]['type_instance'] == - vl.type_instance) and (plugin_vl[index]['type'] == vl.type): + if (plugin_vl[index]['plugin_instance'] == kafka_data['plugin_instance']) and \ + (plugin_vl[index]['type_instance'] == kafka_data['type_instance']) and \ + (plugin_vl[index]['type'] == kafka_data['type']): plugin_vl[index]['pre_time'] = plugin_vl[index]['time'] - plugin_vl[index]['time'] = vl.time + plugin_vl[index]['time'] = kafka_data['time'] plugin_vl[index]['pre_values'] = plugin_vl[index]['values'] - plugin_vl[index]['values'] = vl.values + plugin_vl[index]['values'] = kafka_data['values'] + plugin_vl[index]['dsnames'] = kafka_data['dsnames'] plugin_vl[index]['updated'] = True found = True break if not found: value = {} # create new cache record - value['plugin_instance'] = vl.plugin_instance - value['type_instance'] = vl.type_instance - value['values'] = vl.values - value['pre_values'] = vl.values - value['type'] = vl.type - value['time'] = vl.time - value['pre_time'] = vl.time - value['host'] = vl.host + value['plugin_instance'] = kafka_data['plugin_instance'] + value['type_instance'] = kafka_data['type_instance'] + value['values'] = kafka_data['values'] + value['pre_values'] = kafka_data['values'] + value['type'] = kafka_data['type'] + value['time'] = kafka_data['time'] + value['pre_time'] = kafka_data['time'] + value['host'] = kafka_data['host'] + value['dsnames'] = kafka_data['dsnames'] value['updated'] = True - self.__plugin_data_cache[vl.plugin]['vls'].append(value) + self.__plugin_data_cache[kafka_data['plugin']]['vls'].append(value) # update plugin interval based on one received in the value - self.__plugin_data_cache[vl.plugin]['interval'] = vl.interval + self.__plugin_data_cache[kafka_data['plugin']]['interval'] = kafka_data['interval'] def cache_get_value(self, plugin_name=None, plugin_instance=None, type_name=None, type_instance=None, type_names=None, mark_as_read=True): @@ -850,84 +869,77 @@ class VESPlugin(object): ret_list = [] if plugin_name in self.__plugin_data_cache: for val in self.__plugin_data_cache[plugin_name]['vls']: - #collectd.info("plugin={}, type={}, type_instance={}".format( - # plugin_name, val['type'], val['type_instance'])) - if (type_name == None or type_name == val['type']) and (plugin_instance == None - or plugin_instance == val['plugin_instance']) and (type_instance == None - or type_instance == val['type_instance']) and (type_names == None - or val['type'] in type_names): + if (type_name == None or type_name == val['type']) and \ + (plugin_instance == None or plugin_instance == val['plugin_instance']) and \ + (type_instance == None or type_instance == val['type_instance']) and \ + (type_names == None or val['type'] in type_names): if mark_as_read: val['updated'] = False ret_list.append(val) return ret_list - def write(self, vl, data=None): - """Collectd write callback""" - self.lock() - try: - # Example of collectD Value format - # collectd.Values(type='cpu',type_instance='interrupt', - # plugin='cpu',plugin_instance='25',host='localhost', - # time=1476694097.022873,interval=10.0,values=[0]) - if vl.plugin == 'ves_plugin': - # store the host name and unregister callback - self.__host_name = vl.host - collectd.unregister_read(self.read) - return - # update the cache values - self.update_cache_value(vl) - finally: - self.unlock() - - def read(self, data=None): - """Collectd read callback. Use this callback to get host name""" - vl = collectd.Values(type='gauge') - vl.plugin='ves_plugin' - vl.dispatch(values=[0]) - - def notify(self, n): - """Collectd notification callback""" - collectd_event_severity_map = { - collectd.NOTIF_FAILURE : 'CRITICAL', - collectd.NOTIF_WARNING : 'WARNING', - collectd.NOTIF_OKAY : 'NORMAL' - } - fault = Fault(self.get_event_id()) - # fill out common header - fault.event_type = "Notification" - fault.functional_role = self.__plugin_config['FunctionalRole'] - fault.reporting_entity_id = self.get_hostname() - fault.reporting_entity_name = self.get_hostname() - if n.plugin == 'virt': - # if the notification is generated by virt plugin, - # use the plugin_instance (e.g. VM name) as a source. - fault.source_id = str(n.plugin_instance) - fault.source_name = fault.source_id - else: - fault.source_id = self.get_hostname() - fault.source_name = self.get_hostname() - fault.start_epoch_microsec = (n.time * 1000000) - fault.last_epoch_micro_sec = fault.start_epoch_microsec - # fill out fault header - fault.event_severity = collectd_event_severity_map[n.severity] - fault.specific_problem = self.make_dash_string(n.plugin_instance, n.type_instance) - fault.alarm_interface_a = self.make_dash_string(n.plugin, n.plugin_instance) - fault.event_source_type = 'host(3)' - fault.alarm_condition = n.message - self.event_send(fault) - def shutdown(self): - """Collectd shutdown callback""" - # stop the timer + """Shutdown method for clean up""" self.stop_timer() -# The collectd plugin instance -plugin_instance = VESPlugin() - -# Register plugin callbacks -collectd.register_config(plugin_instance.config) -collectd.register_init(plugin_instance.init) -collectd.register_read(plugin_instance.read) -collectd.register_write(plugin_instance.write) -collectd.register_notification(plugin_instance.notify) -collectd.register_shutdown(plugin_instance.shutdown) + def run(self): + """Consumer JSON data from kafka broker""" + kafka_server = '%s:%s' % (self.__app_config.get('KafkaBroker'), self.__app_config.get('KafkaPort')) + consumer = KafkaConsumer('collectd', bootstrap_servers=kafka_server, auto_offset_reset='latest', + enable_auto_commit=False, + value_deserializer=lambda m: json.loads(m.decode('ascii'))) + + for message in consumer: + for kafka_data in message.value: + self.lock() + try: + # Receive Kafka data and update cache values + self.update_cache_value(kafka_data) + + finally: + self.unlock() + + +def main(): + # Parsing cmdline options + parser = argparse.ArgumentParser() + parser.add_argument("--config", dest="configfile", default=None, help="Specify config file", metavar="FILE") + parser.add_argument("--loglevel", dest="level", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='WARNING', + help="Specify log level (default: %(default)s)", metavar="LEVEL") + parser.add_argument("--logfile", dest="logfile", default='ves_plugin.log', + help="Specify log file (default: %(default)s)", metavar="FILE") + args = parser.parse_args() + + # Create log file + logging.basicConfig(filename=args.logfile, format='%(asctime)s %(message)s', level=args.level) + + # Create Application Instance + application_instance = VESApp() + + # Read from config file + if args.configfile: + config = ConfigParser.ConfigParser() + config.optionxform = lambda option: option + config.read(args.configfile) + # Write Config Values + application_instance.config(config) + else: + logging.warning("No configfile specified, using default options") + + # Start timer for interval + application_instance.init() + + # Read Data from Kakfa + try: + # Kafka consumer & update cache + application_instance.run() + except KeyboardInterrupt: + logging.info(" - Ctrl-C handled, exiting gracefully") + application_instance.shutdown() + sys.exit() + except: + logging.error("Unknown Error") + + +if __name__ == '__main__': + main() |