diff options
Diffstat (limited to '3rd_party/collectd-ves-app/ves_app/ves_app.py')
-rw-r--r-- | 3rd_party/collectd-ves-app/ves_app/ves_app.py | 939 |
1 files changed, 104 insertions, 835 deletions
diff --git a/3rd_party/collectd-ves-app/ves_app/ves_app.py b/3rd_party/collectd-ves-app/ves_app/ves_app.py index 49e7635a..105c66e2 100644 --- a/3rd_party/collectd-ves-app/ves_app/ves_app.py +++ b/3rd_party/collectd-ves-app/ves_app/ves_app.py @@ -22,522 +22,23 @@ import argparse from distutils.util import strtobool from kafka import KafkaConsumer +from normalizer import Normalizer +from normalizer import CollectdValue + try: # For Python 3.0 and later import urllib.request as url except ImportError: # Fall back to Python 2's urllib2 import urllib2 as url -import socket -import time -from threading import Timer -from threading import Lock - - -class Event(object): - """Event header""" - - def __init__(self): - """Construct the common header""" - self.version = 2.0 - self.event_type = "Info" # use "Info" unless a notification is generated - self.domain = "" - self.event_id = "" - self.source_id = "" - self.source_name = "" - self.functional_role = "" - self.reporting_entity_id = "" - self.reporting_entity_name = "" - self.priority = "Normal" # will be derived from event if there is one - self.start_epoch_microsec = 0 - self.last_epoch_micro_sec = 0 - self.sequence = 0 - self.event_name = "" - self.internal_header_fields = {} - self.nfc_naming_code = "" - self.nf_naming_code = "" - - def get_json(self): - """Get the object of the datatype""" - obj = {} - obj['version'] = self.version - obj['eventType'] = self.event_type - obj['domain'] = self.domain - obj['eventId'] = self.event_id - obj['sourceId'] = self.source_id - obj['sourceName'] = self.source_name - obj['reportingEntityId'] = self.reporting_entity_id - obj['reportingEntityName'] = self.reporting_entity_name - obj['priority'] = self.priority - obj['startEpochMicrosec'] = self.start_epoch_microsec - obj['lastEpochMicrosec'] = self.last_epoch_micro_sec - obj['sequence'] = self.sequence - obj['eventName'] = self.event_name - obj['internalHeaderFields'] = self.internal_header_fields - obj['nfcNamingCode'] = self.nfc_naming_code - obj['nfNamingCode'] = self.nf_naming_code - return json.dumps({ - 'event': { - 'commonEventHeader': obj, - self.get_name(): self.get_obj() - } - }).encode() - - def get_name(): - assert False, 'abstract method get_name() is not implemented' - - def get_obj(): - assert False, 'abstract method get_obj() is not implemented' - - -class Field(object): - """field datatype""" - - def __init__(self, name, value): - self.name = name - self.value = value - - def get_obj(self): - return { - 'name': self.name, - 'value': self.value - } - - -class NamedArrayOfFields(object): - """namedArrayOfFields datatype""" - - def __init__(self, name): - self.name = name - self.array_of_fields = [] - - def add(self, field): - self.array_of_fields.append(field.get_obj()) - - def get_obj(self): - return { - 'name': self.name, - 'arrayOfFields': self.array_of_fields - } - - -class VESDataType(object): - """ Base VES datatype """ - - def set_optional(self, obj, key, val): - if val is not None: - obj[key] = val - - -class DiskUsage(VESDataType): - """diskUsage datatype""" - def __init__(self, identifier): - self.disk_identifier = identifier - self.disk_io_time_avg = None - self.disk_io_time_last = None - self.disk_io_time_max = None - self.disk_io_time_min = None - self.disk_merged_read_avg = None - self.disk_merged_read_last = None - self.disk_merged_read_max = None - self.disk_merged_read_min = None - self.disk_merged_write_avg = None - self.disk_merged_write_last = None - self.disk_merged_write_max = None - self.disk_merged_write_min = None - self.disk_octets_read_avg = None - self.disk_octets_read_last = None - self.disk_octets_read_max = None - self.disk_octets_read_min = None - self.disk_octets_write_avg = None - self.disk_octets_write_last = None - self.disk_octets_write_max = None - self.disk_octets_write_min = None - self.disk_ops_read_avg = None - self.disk_ops_read_last = None - self.disk_ops_read_max = None - self.disk_ops_read_min = None - self.disk_ops_write_avg = None - self.disk_ops_write_last = None - self.disk_ops_write_max = None - self.disk_ops_write_min = None - self.disk_pending_operations_avg = None - self.disk_pending_operations_last = None - self.disk_pending_operations_max = None - self.disk_pending_operations_min = None - self.disk_time_read_avg = None - self.disk_time_read_last = None - self.disk_time_read_max = None - self.disk_time_read_min = None - self.disk_time_write_avg = None - self.disk_time_write_last = None - self.disk_time_write_max = None - self.disk_time_write_min = None - - def get_obj(self): - obj = { - # required - 'diskIdentifier': self.disk_identifier - } - self.set_optional(obj, 'diskIoTimeAvg', self.disk_io_time_avg) - self.set_optional(obj, 'diskIoTimeLast', self.disk_io_time_last) - self.set_optional(obj, 'diskIoTimeMax', self.disk_io_time_max) - self.set_optional(obj, 'diskIoTimeMin', self.disk_io_time_min) - self.set_optional(obj, 'diskMergedReadAvg', self.disk_merged_read_avg) - self.set_optional(obj, 'diskMergedReadLast', self.disk_merged_read_last) - self.set_optional(obj, 'diskMergedReadMax', self.disk_merged_read_max) - self.set_optional(obj, 'diskMergedReadMin', self.disk_merged_read_min) - self.set_optional(obj, 'diskMergedWriteAvg', self.disk_merged_write_avg) - self.set_optional(obj, 'diskMergedWriteLast', self.disk_merged_write_last) - self.set_optional(obj, 'diskMergedWriteMax', self.disk_merged_write_max) - self.set_optional(obj, 'diskMergedWriteMin', self.disk_merged_write_min) - self.set_optional(obj, 'diskOctetsReadAvg', self.disk_octets_read_avg) - self.set_optional(obj, 'diskOctetsReadLast', self.disk_octets_read_last) - self.set_optional(obj, 'diskOctetsReadMax', self.disk_octets_read_max) - self.set_optional(obj, 'diskOctetsReadMin', self.disk_octets_read_min) - self.set_optional(obj, 'diskOctetsWriteAvg', self.disk_octets_write_avg) - self.set_optional(obj, 'diskOctetsWriteLast', self.disk_octets_write_last) - self.set_optional(obj, 'diskOctetsWriteMax', self.disk_octets_write_max) - self.set_optional(obj, 'diskOctetsWriteMin', self.disk_octets_write_min) - self.set_optional(obj, 'diskOpsReadAvg', self.disk_ops_read_avg) - self.set_optional(obj, 'diskOpsReadLast', self.disk_ops_read_last) - self.set_optional(obj, 'diskOpsReadMax', self.disk_ops_read_max) - self.set_optional(obj, 'diskOpsReadMin', self.disk_ops_read_min) - self.set_optional(obj, 'diskOpsWriteAvg', self.disk_ops_write_avg) - self.set_optional(obj, 'diskOpsWriteLast', self.disk_ops_write_last) - self.set_optional(obj, 'diskOpsWriteMax', self.disk_ops_write_max) - self.set_optional(obj, 'diskOpsWriteMin', self.disk_ops_write_min) - self.set_optional(obj, 'diskPendingOperationsAvg', self.disk_pending_operations_avg) - self.set_optional(obj, 'diskPendingOperationsLast', self.disk_pending_operations_last) - self.set_optional(obj, 'diskPendingOperationsMax', self.disk_pending_operations_max) - self.set_optional(obj, 'diskPendingOperationsMin', self.disk_pending_operations_min) - self.set_optional(obj, 'diskTimeReadAvg', self.disk_time_read_avg) - self.set_optional(obj, 'diskTimeReadLast', self.disk_time_read_last) - self.set_optional(obj, 'diskTimeReadMax', self.disk_time_read_max) - self.set_optional(obj, 'diskTimeReadMin', self.disk_time_read_min) - self.set_optional(obj, 'diskTimeWriteAvg', self.disk_time_write_avg) - self.set_optional(obj, 'diskTimeWriteLast', self.disk_time_write_last) - self.set_optional(obj, 'diskTimeWriteMax', self.disk_time_write_max) - self.set_optional(obj, 'diskTimeWriteMin', self.disk_time_write_min) - return obj - - -class VNicPerformance(VESDataType): - """vNicPerformance datatype""" - - def __init__(self, identifier): - self.received_broadcast_packets_accumulated = None - self.received_broadcast_packets_delta = None - self.received_discarded_packets_accumulated = None - self.received_discarded_packets_delta = None - self.received_error_packets_accumulated = None - self.received_error_packets_delta = None - self.received_multicast_packets_accumulated = None - self.received_multicast_packets_delta = None - self.received_octets_accumulated = None - self.received_octets_delta = None - self.received_total_packets_accumulated = None - self.received_total_packets_delta = None - self.received_unicast_packets_accumulated = None - self.received_unicast_packets_delta = None - self.transmitted_broadcast_packets_accumulated = None - self.transmitted_broadcast_packets_delta = None - self.transmitted_discarded_packets_accumulated = None - self.transmitted_discarded_packets_delta = None - self.transmitted_error_packets_accumulated = None - self.transmitted_error_packets_delta = None - self.transmitted_multicast_packets_accumulated = None - self.transmitted_multicast_packets_delta = None - self.transmitted_octets_accumulated = None - self.transmitted_octets_delta = None - self.transmitted_total_packets_accumulated = None - self.transmitted_total_packets_delta = None - self.transmitted_unicast_packets_accumulated = None - self.transmitted_unicast_packets_delta = None - self.values_are_suspect = 'true' - self.v_nic_identifier = identifier - - def get_obj(self): - obj = { - # required - 'valuesAreSuspect': self.values_are_suspect, - 'vNicIdentifier': self.v_nic_identifier - } - # optional - self.set_optional(obj, 'receivedBroadcastPacketsAccumulated', self.received_broadcast_packets_accumulated) - self.set_optional(obj, 'receivedBroadcastPacketsDelta', self.received_broadcast_packets_delta) - self.set_optional(obj, 'receivedDiscardedPacketsAccumulated', self.received_discarded_packets_accumulated) - self.set_optional(obj, 'receivedDiscardedPacketsDelta', self.received_discarded_packets_delta) - self.set_optional(obj, 'receivedErrorPacketsAccumulated', self.received_error_packets_accumulated) - self.set_optional(obj, 'receivedErrorPacketsDelta', self.received_error_packets_delta) - self.set_optional(obj, 'receivedMulticastPacketsAccumulated', self.received_multicast_packets_accumulated) - self.set_optional(obj, 'receivedMulticastPacketsDelta', self.received_multicast_packets_delta) - self.set_optional(obj, 'receivedOctetsAccumulated', self.received_octets_accumulated) - self.set_optional(obj, 'receivedOctetsDelta', self.received_octets_delta) - self.set_optional(obj, 'receivedTotalPacketsAccumulated', self.received_total_packets_accumulated) - self.set_optional(obj, 'receivedTotalPacketsDelta', self.received_total_packets_delta) - self.set_optional(obj, 'receivedUnicastPacketsAccumulated', self.received_unicast_packets_accumulated) - self.set_optional(obj, 'receivedUnicastPacketsDelta', self.received_unicast_packets_delta) - self.set_optional(obj, 'transmittedBroadcastPacketsAccumulated', self.transmitted_broadcast_packets_accumulated) - self.set_optional(obj, 'transmittedBroadcastPacketsDelta', self.transmitted_broadcast_packets_delta) - self.set_optional(obj, 'transmittedDiscardedPacketsAccumulated', self.transmitted_discarded_packets_accumulated) - self.set_optional(obj, 'transmittedDiscardedPacketsDelta', self.transmitted_discarded_packets_delta) - self.set_optional(obj, 'transmittedErrorPacketsAccumulated', self.transmitted_error_packets_accumulated) - self.set_optional(obj, 'transmittedErrorPacketsDelta', self.transmitted_error_packets_delta) - self.set_optional(obj, 'transmittedMulticastPacketsAccumulated', self.transmitted_multicast_packets_accumulated) - self.set_optional(obj, 'transmittedMulticastPacketsDelta', self.transmitted_multicast_packets_delta) - self.set_optional(obj, 'transmittedOctetsAccumulated', self.transmitted_octets_accumulated) - self.set_optional(obj, 'transmittedOctetsDelta', self.transmitted_octets_delta) - self.set_optional(obj, 'transmittedTotalPacketsAccumulated', self.transmitted_total_packets_accumulated) - self.set_optional(obj, 'transmittedTotalPacketsDelta', self.transmitted_total_packets_delta) - self.set_optional(obj, 'transmittedUnicastPacketsAccumulated', self.transmitted_unicast_packets_accumulated) - self.set_optional(obj, 'transmittedUnicastPacketsDelta', self.transmitted_unicast_packets_delta) - return obj - - -class CpuUsage(VESDataType): - """cpuUsage datatype""" - - def __init__(self, identifier): - self.cpu_identifier = identifier - self.cpu_idle = None - self.cpu_usage_interrupt = None - self.cpu_usage_nice = None - self.cpu_usage_soft_irq = None - self.cpu_usage_steal = None - self.cpu_usage_system = None - self.cpu_usage_user = None - self.cpu_wait = None - self.percent_usage = 0 - - def get_obj(self): - obj = { - # required - 'cpuIdentifier': self.cpu_identifier, - 'percentUsage': self.percent_usage - } - # optional - self.set_optional(obj, 'cpuIdle', self.cpu_idle) - self.set_optional(obj, 'cpuUsageInterrupt', self.cpu_usage_interrupt) - self.set_optional(obj, 'cpuUsageNice', self.cpu_usage_nice) - self.set_optional(obj, 'cpuUsageSoftIrq', self.cpu_usage_soft_irq) - self.set_optional(obj, 'cpuUsageSteal', self.cpu_usage_steal) - self.set_optional(obj, 'cpuUsageSystem', self.cpu_usage_system) - self.set_optional(obj, 'cpuUsageUser', self.cpu_usage_user) - self.set_optional(obj, 'cpuWait', self.cpu_wait) - return obj - - -class MemoryUsage(VESDataType): - """memoryUsage datatype""" - - def __init__(self, identifier): - self.memory_buffered = None - self.memory_cached = None - self.memory_configured = None - self.memory_free = None - self.memory_slab_recl = None - self.memory_slab_unrecl = None - self.memory_used = None - self.vm_identifier = identifier - - def __str__(self): - """ for debug purposes """ - return 'vm_identifier : {vm_identifier}\nbuffered : {buffered}\n' \ - 'cached : {cached}\nconfigured : {configured}\nfree : {free}\n' \ - 'slab_recl : {slab_recl}\nslab_unrecl : {slab_unrecl}\n' \ - 'used : {used}\n'.format(buffered=self.memory_buffered, - cached=self.memory_cached, configured=self.memory_configured, - free=self.memory_free, slab_recl=self.memory_slab_recl, - slab_unrecl=self.memory_slab_unrecl, used=self.memory_used, - vm_identifier=self.vm_identifier) - - def get_memory_free(self): - if self.memory_free is None: - # calculate the free memory - if None not in (self.memory_configured, self.memory_used): - return self.memory_configured - self.memory_used - else: - # required field, so return zero - return 0 - else: - return self.memory_free - - def get_memory_used(self): - if self.memory_used is None: - # calculate the memory used - if None not in (self.memory_configured, self.memory_free, self.memory_buffered, - self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl): - return self.memory_configured - (self.memory_free + - self.memory_buffered + self.memory_cached + - self.memory_slab_recl + self.memory_slab_unrecl) - else: - # required field, so return zero - return 0 - else: - return self.memory_used - - def get_memory_total(self): - if self.memory_configured is None: - # calculate the total memory - if None not in (self.memory_used, self.memory_free, self.memory_buffered, - self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl): - return (self.memory_used + self.memory_free + - self.memory_buffered + self.memory_cached + - self.memory_slab_recl + self.memory_slab_unrecl) - else: - return None - else: - return self.memory_configured - - def get_obj(self): - obj = { - # required fields - 'memoryFree': self.get_memory_free(), - 'memoryUsed': self.get_memory_used(), - 'vmIdentifier': self.vm_identifier - } - # optional fields - self.set_optional(obj, 'memoryBuffered', self.memory_buffered) - self.set_optional(obj, 'memoryCached', self.memory_cached) - self.set_optional(obj, 'memoryConfigured', self.memory_configured) - self.set_optional(obj, 'memorySlabRecl', self.memory_slab_recl) - self.set_optional(obj, 'memorySlabUnrecl', self.memory_slab_unrecl) - return obj - -class MeasurementsForVfScaling(Event): - """MeasurementsForVfScaling datatype""" - - def __init__(self, event_id): - """Construct the header""" - super(MeasurementsForVfScaling, self).__init__() - # common attributes - self.domain = "measurementsForVfScaling" - self.event_type = 'hostOS' - self.event_id = event_id - # measurement attributes - self.additional_measurements = [] - self.codec_usage_array = [] - self.concurrent_sessions = 0 - self.configured_entities = 0 - self.cpu_usage_array = [] - self.feature_usage_array = [] - self.filesystem_usage_array = [] - self.latency_distribution = [] - self.mean_request_latency = 0 - self.measurement_interval = 0 - self.number_of_media_ports_in_use = 0 - self.request_rate = 0 - self.vnfc_scaling_metric = 0 - self.additional_fields = [] - self.additional_objects = [] - self.disk_usage_array = [] - self.measurements_for_vf_scaling_version = 2.0 - self.memory_usage_array = [] - self.v_nic_performance_array = [] - - def add_additional_measurement(self, named_array): - self.additional_measurements.append(named_array.get_obj()) - - def add_additional_fields(self, field): - self.additional_fields.append(field.get_obj()) - - def add_memory_usage(self, mem_usage): - self.memory_usage_array.append(mem_usage.get_obj()) - - def add_cpu_usage(self, cpu_usage): - self.cpu_usage_array.append(cpu_usage.get_obj()) - - def add_v_nic_performance(self, nic_performance): - self.v_nic_performance_array.append(nic_performance.get_obj()) - - def add_disk_usage(self, disk_usage): - self.disk_usage_array.append(disk_usage.get_obj()) - - def get_obj(self): - """Get the object of the datatype""" - obj = {} - obj['additionalMeasurements'] = self.additional_measurements - obj['codecUsageArray'] = self.codec_usage_array - obj['concurrentSessions'] = self.concurrent_sessions - obj['configuredEntities'] = self.configured_entities - obj['cpuUsageArray'] = self.cpu_usage_array - obj['featureUsageArray'] = self.feature_usage_array - obj['filesystemUsageArray'] = self.filesystem_usage_array - obj['latencyDistribution'] = self.latency_distribution - obj['meanRequestLatency'] = self.mean_request_latency - obj['measurementInterval'] = self.measurement_interval - obj['numberOfMediaPortsInUse'] = self.number_of_media_ports_in_use - obj['requestRate'] = self.request_rate - obj['vnfcScalingMetric'] = self.vnfc_scaling_metric - obj['additionalFields'] = self.additional_fields - obj['additionalObjects'] = self.additional_objects - obj['diskUsageArray'] = self.disk_usage_array - obj['measurementsForVfScalingVersion'] = self.measurements_for_vf_scaling_version - obj['memoryUsageArray'] = self.memory_usage_array - obj['vNicPerformanceArray'] = self.v_nic_performance_array - return obj - - def get_name(self): - """Name of datatype""" - return "measurementsForVfScalingFields" - - -class Fault(Event): - """Fault datatype""" - - def __init__(self, event_id): - """Construct the header""" - super(Fault, self).__init__() - # common attributes - self.domain = "fault" - self.event_id = event_id - self.event_type = "Fault" - # fault attributes - self.fault_fields_version = 1.1 - self.event_severity = 'NORMAL' - self.event_source_type = 'other(0)' - self.alarm_condition = '' - self.specific_problem = '' - self.vf_status = 'Active' - self.alarm_interface_a = '' - self.alarm_additional_information = [] - self.event_category = "" - - def get_name(self): - """Name of datatype""" - return 'faultFields' - - def get_obj(self): - """Get the object of the datatype""" - obj = {} - obj['faultFieldsVersion'] = self.fault_fields_version - obj['eventSeverity'] = self.event_severity - obj['eventSourceType'] = self.event_source_type - obj['alarmCondition'] = self.alarm_condition - obj['specificProblem'] = self.specific_problem - obj['vfStatus'] = self.vf_status - obj['alarmInterfaceA'] = self.alarm_interface_a - obj['alarmAdditionalInformation'] = self.alarm_additional_information - obj['eventCategory'] = self.event_category - return obj - - -class VESApp(object): +class VESApp(Normalizer): """VES Application""" def __init__(self): """Application initialization""" - self.__plugin_data_cache = { - 'cpu': {'interval': 0.0, 'vls': []}, - 'virt': {'interval': 0.0, 'vls': []}, - 'disk': {'interval': 0.0, 'vls': []}, - 'interface': {'interval': 0.0, 'vls': []}, - 'memory': {'interval': 0.0, 'vls': []} - } - self.__app_config = { + self._app_config = { 'Domain': '127.0.0.1', 'Port': 30000, 'Path': '', @@ -546,270 +47,56 @@ class VESApp(object): 'Topic': '', 'UseHttps': False, 'SendEventInterval': 20.0, - 'FunctionalRole': 'Collectd VES Agent', 'ApiVersion': 5.1, 'KafkaPort': 9092, 'KafkaBroker': 'localhost' } - self.__host_name = None - self.__ves_timer = None - self.__lock = Lock() - self.__event_id = 0 - - def get_event_id(self): - """get event id""" - self.__event_id += 1 - return str(self.__event_id) - - def lock(self): - """Lock the application""" - self.__lock.acquire() - - def unlock(self): - """Unlock the application""" - self.__lock.release() - - def start_timer(self): - """Start event timer""" - self.__ves_timer = Timer(self.__app_config['SendEventInterval'], self.__on_time) - self.__ves_timer.start() - - def stop_timer(self): - """Stop event timer""" - self.__ves_timer.cancel() - - def __on_time(self): - """Timer thread""" - self.event_timer() - self.start_timer() - def event_send(self, event): + def send_data(self, event): """Send event to VES""" server_url = "http{}://{}:{}{}/eventListener/v{}{}".format( - 's' if self.__app_config['UseHttps'] else '', self.__app_config['Domain'], - int(self.__app_config['Port']), '{}'.format( - '/{}'.format(self.__app_config['Path']) if (len(self.__app_config['Path']) > 0) else ''), - int(self.__app_config['ApiVersion']), '{}'.format( - '/{}'.format(self.__app_config['Topic']) if (len(self.__app_config['Topic']) > 0) else '')) + 's' if self._app_config['UseHttps'] else '', + self._app_config['Domain'], int(self._app_config['Port']), + '{}'.format('/{}'.format(self._app_config['Path']) if len( + self._app_config['Path']) > 0 else ''), + int(self._app_config['ApiVersion']), '{}'.format( + '/{}'.format(self._app_config['Topic']) if len( + self._app_config['Topic']) > 0 else '')) logging.info('Vendor Event Listener is at: {}'.format(server_url)) credentials = base64.b64encode('{}:{}'.format( - self.__app_config['Username'], self.__app_config['Password']).encode()).decode() + self._app_config['Username'], + self._app_config['Password']).encode()).decode() logging.info('Authentication credentials are: {}'.format(credentials)) try: request = url.Request(server_url) request.add_header('Authorization', 'Basic {}'.format(credentials)) request.add_header('Content-Type', 'application/json') - logging.debug("Sending {} to {}".format(event.get_json(), server_url)) - vel = url.urlopen(request, event.get_json(), timeout=1) + event_str = json.dumps(event).encode() + logging.debug("Sending {} to {}".format(event_str, server_url)) + url.urlopen(request, event_str, timeout=1) logging.debug("Sent data to {} successfully".format(server_url)) except url.HTTPError as e: logging.error('Vendor Event Listener exception: {}'.format(e)) except url.URLError as e: - logging.error('Vendor Event Listener is is not reachable: {}'.format(e)) + logging.error( + 'Vendor Event Listener is is not reachable: {}'.format(e)) except Exception as e: logging.error('Vendor Event Listener error: {}'.format(e)) - def bytes_to_kb(self, bytes): - """Convert bytes to kibibytes""" - return round((bytes / 1024.0), 3) - - def get_hostname(self): - return socket.gethostname() - - def send_host_measurements(self): - # get list of all VMs - virt_vcpu_total = self.cache_get_value(plugin_name='virt', type_name='virt_cpu_total', - mark_as_read=False) - vm_names = [x['plugin_instance'] for x in virt_vcpu_total] - for vm_name in vm_names: - # make sure that 'virt' plugin cache is up-to-date - vm_values = self.cache_get_value(plugin_name='virt', plugin_instance=vm_name, - mark_as_read=False) - us_up_to_date = True - for vm_value in vm_values: - if vm_value['updated'] == False: - logging.warning("%s" % vm_value) - us_up_to_date = False - break - if not us_up_to_date: - # one of the cache value is not up-to-date, break - logging.warning("virt collectd cache values are not up-to-date for {}".format(vm_name)) - continue - # values are up-to-date, create an event message - measurement = MeasurementsForVfScaling(self.get_event_id()) - measurement.functional_role = self.__app_config['FunctionalRole'] - # fill out reporting_entity - measurement.reporting_entity_id = self.get_hostname() - measurement.reporting_entity_name = measurement.reporting_entity_id - # set source as a host value - measurement.source_id = vm_name - measurement.source_name = measurement.source_id - # fill out EpochMicrosec (convert to us) - measurement.start_epoch_microsec = (virt_vcpu_total[0]['time'] * 1000000) - # plugin interval - measurement.measurement_interval = self.__plugin_data_cache['virt']['interval'] - # memoryUsage - mem_usage = MemoryUsage(vm_name) - memory_total = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='memory', type_instance='total') - memory_unused = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='memory', type_instance='unused') - memory_rss = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='memory', type_instance='rss') - if len(memory_total) > 0: - mem_usage.memory_configured = self.bytes_to_kb(memory_total[0]['values'][0]) - if len(memory_unused) > 0: - mem_usage.memory_free = self.bytes_to_kb(memory_unused[0]['values'][0]) - elif len(memory_rss) > 0: - mem_usage.memory_free = self.bytes_to_kb(memory_rss[0]['values'][0]) - # since, "used" metric is not provided by virt plugn, set the rest of the memory stats - # to zero to calculate used based on provided stats only - mem_usage.memory_buffered = mem_usage.memory_cached = mem_usage.memory_slab_recl = \ - mem_usage.memory_slab_unrecl = 0 - measurement.add_memory_usage(mem_usage) - # cpuUsage - virt_vcpus = self.cache_get_value(plugin_instance=vm_name, - plugin_name='virt', type_name='virt_vcpu') - for virt_vcpu in virt_vcpus: - cpu_usage = CpuUsage(virt_vcpu['type_instance']) - cpu_usage.percent_usage = self.cpu_ns_to_percentage(virt_vcpu) - measurement.add_cpu_usage(cpu_usage) - # vNicPerformance - if_packets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='if_packets', mark_as_read=False) - if_names = [x['type_instance'] for x in if_packets] - for if_name in if_names: - if_packets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='if_packets', type_instance=if_name) - if_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='if_octets', type_instance=if_name) - if_errors = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='if_errors', type_instance=if_name) - if_dropped = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='if_dropped', type_instance=if_name) - v_nic_performance = VNicPerformance(if_name) - v_nic_performance.received_total_packets_accumulated = if_packets[0]['values'][0] - v_nic_performance.transmitted_total_packets_accumulated = if_packets[0]['values'][1] - v_nic_performance.received_octets_accumulated = if_octets[0]['values'][0] - v_nic_performance.transmitted_octets_accumulated = if_octets[0]['values'][1] - v_nic_performance.received_error_packets_accumulated = if_errors[0]['values'][0] - v_nic_performance.transmitted_error_packets_accumulated = if_errors[0]['values'][1] - v_nic_performance.received_discarded_packets_accumulated = if_dropped[0]['values'][0] - v_nic_performance.transmitted_discarded_packets_accumulated = if_dropped[0]['values'][1] - measurement.add_v_nic_performance(v_nic_performance) - # diskUsage - disk_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='disk_octets', mark_as_read=False) - disk_names = [x['type_instance'] for x in disk_octets] - for disk_name in disk_names: - disk_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='disk_octets', type_instance=disk_name) - disk_ops = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='disk_ops', type_instance=disk_name) - disk_usage = DiskUsage(disk_name) - disk_usage.disk_octets_read_last = disk_octets[0]['values'][0] - disk_usage.disk_octets_write_last = disk_octets[0]['values'][1] - disk_usage.disk_ops_read_last = disk_ops[0]['values'][0] - disk_usage.disk_ops_write_last = disk_ops[0]['values'][1] - measurement.add_disk_usage(disk_usage) - # add additional measurements (perf) - perf_values = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='perf') - named_array = NamedArrayOfFields('perf') - for perf in perf_values: - named_array.add(Field(perf['type_instance'], str(perf['values'][0]))) - measurement.add_additional_measurement(named_array) - # add host values as additional measurements - self.set_additional_fields(measurement, exclude_plugins=['virt']) - # send event to the VES - self.event_send(measurement) - if len(vm_names) > 0: - # mark the additional measurements metrics as read - self.mark_cache_values_as_read(exclude_plugins=['virt']) - - def event_timer(self): - """Event timer thread""" - self.lock() - try: - self.send_host_measurements() - finally: - self.unlock() - - def mark_cache_values_as_read(self, exclude_plugins=None): - """mark the cache values as read""" - for plugin_name in self.__plugin_data_cache.keys(): - if (exclude_plugins != None and plugin_name in exclude_plugins): - # skip excluded plugins - continue; - for val in self.__plugin_data_cache[plugin_name]['vls']: - val['updated'] = False - - def set_additional_measurements(self, measurement, exclude_plugins=None): - """Set addition measurement filed with host/guets values""" - # add host/guest values as additional measurements - for plugin_name in self.__plugin_data_cache.keys(): - if (exclude_plugins != None and plugin_name in exclude_plugins): - # skip excluded plugins - continue; - for val in self.__plugin_data_cache[plugin_name]['vls']: - if val['updated']: - array_name = self.make_dash_string(plugin_name, val['plugin_instance'], - val['type_instance']) - named_array = NamedArrayOfFields(array_name) - - for index in range(len(val['dsnames'])): - mname = '{}-{}'.format(val['type'], val['dsnames'][index]) - named_array.add(Field(mname, str(val['values'][index]))) - measurement.add_additional_measurement(named_array); - val['updated'] = False - - def set_additional_fields(self, measurement, exclude_plugins=None): - # set host values as additional fields - for plugin_name in self.__plugin_data_cache.keys(): - if (exclude_plugins != None and plugin_name in exclude_plugins): - # skip excluded plugins - continue; - for val in self.__plugin_data_cache[plugin_name]['vls']: - if val['updated']: - name_prefix = self.make_dash_string(plugin_name, val['plugin_instance'], - val['type_instance']) - - for index in range(len(val['dsnames'])): - field_name = self.make_dash_string(name_prefix, val['type'], val['dsnames'][index]) - measurement.add_additional_fields(Field(field_name, str(val['values'][index]))) - - def cpu_ns_to_percentage(self, vl): - """Convert CPU usage ns to CPU %""" - total = vl['values'][0] - total_time = vl['time'] - pre_total = vl['pre_values'][0] - pre_total_time = vl['pre_time'] - if (total_time - pre_total_time) == 0: - # return zero usage if time diff is zero - return 0.0 - percent = (100.0 * (total - pre_total)) / ((total_time - pre_total_time) * 1000000000.0) - logging.debug("pre_time={}, pre_value={}, time={}, value={}, cpu={}%".format( - pre_total_time, pre_total, total_time, total, round(percent, 2))) - return round(percent, 2) - - def make_dash_string(self, *args): - """Join non empty strings with dash symbol""" - return '-'.join(filter(lambda x: len(x) > 0, args)) - def config(self, config): """VES option configuration""" for key, value in config.items('config'): - if key in self.__app_config: + if key in self._app_config: try: - if type(self.__app_config[key]) == int: + if type(self._app_config[key]) == int: value = int(value) - elif type(self.__app_config[key]) == float: + elif type(self._app_config[key]) == float: value = float(value) - elif type(self.__app_config[key]) == bool: - value = bool(distutils.util.strtobool(value)) + elif type(self._app_config[key]) == bool: + value = bool(strtobool(value)) - if isinstance(value, type(self.__app_config[key])): - self.__app_config[key] = value + if isinstance(value, type(self._app_config[key])): + self._app_config[key] = value else: logging.error("Type mismatch with %s" % key) sys.exit() @@ -820,125 +107,107 @@ class VESApp(object): logging.error("Incorrect key configuration %s" % key) sys.exit() - def init(self): - """Initialisation""" - # start the VES timer - self.start_timer() - - def update_cache_value(self, kafka_data): - """Update value internal collectd cache values or create new one - Please note, the cache should be locked before using this function""" - found = False - if kafka_data['plugin'] not in self.__plugin_data_cache: - self.__plugin_data_cache[kafka_data['plugin']] = {'vls': []} - plugin_vl = self.__plugin_data_cache[kafka_data['plugin']]['vls'] - - for index in range(len(plugin_vl)): - # record found, so just update time the values - if (plugin_vl[index]['plugin_instance'] == kafka_data['plugin_instance']) and \ - (plugin_vl[index]['type_instance'] == kafka_data['type_instance']) and \ - (plugin_vl[index]['type'] == kafka_data['type']): - plugin_vl[index]['pre_time'] = plugin_vl[index]['time'] - plugin_vl[index]['time'] = kafka_data['time'] - plugin_vl[index]['pre_values'] = plugin_vl[index]['values'] - plugin_vl[index]['values'] = kafka_data['values'] - plugin_vl[index]['dsnames'] = kafka_data['dsnames'] - plugin_vl[index]['updated'] = True - found = True - break - if not found: - value = {} - # create new cache record - value['plugin_instance'] = kafka_data['plugin_instance'] - value['type_instance'] = kafka_data['type_instance'] - value['values'] = kafka_data['values'] - value['pre_values'] = kafka_data['values'] - value['type'] = kafka_data['type'] - value['time'] = kafka_data['time'] - value['pre_time'] = kafka_data['time'] - value['host'] = kafka_data['host'] - value['dsnames'] = kafka_data['dsnames'] - value['updated'] = True - self.__plugin_data_cache[kafka_data['plugin']]['vls'].append(value) - # update plugin interval based on one received in the value - self.__plugin_data_cache[kafka_data['plugin']]['interval'] = kafka_data['interval'] - - def cache_get_value(self, plugin_name=None, plugin_instance=None, - type_name=None, type_instance=None, type_names=None, mark_as_read=True): - """Get cache value by given criteria""" - ret_list = [] - if plugin_name in self.__plugin_data_cache: - for val in self.__plugin_data_cache[plugin_name]['vls']: - if (type_name == None or type_name == val['type']) and \ - (plugin_instance == None or plugin_instance == val['plugin_instance']) and \ - (type_instance == None or type_instance == val['type_instance']) and \ - (type_names == None or val['type'] in type_names): - if mark_as_read: - val['updated'] = False - ret_list.append(val) - return ret_list - - def shutdown(self): - """Shutdown method for clean up""" - self.stop_timer() + def init(self, configfile, schema_file): + if configfile is not None: + # read VES configuration file if provided + config = ConfigParser.ConfigParser() + config.optionxform = lambda option: option + config.read(configfile) + self.config(config) + # initialize normalizer + self.initialize(schema_file, self._app_config['SendEventInterval']) def run(self): """Consumer JSON data from kafka broker""" - kafka_server = '%s:%s' % (self.__app_config.get('KafkaBroker'), self.__app_config.get('KafkaPort')) - consumer = KafkaConsumer('collectd', bootstrap_servers=kafka_server, auto_offset_reset='latest', - enable_auto_commit=False, - value_deserializer=lambda m: json.loads(m.decode('ascii'))) + kafka_server = '{}:{}'.format( + self._app_config.get('KafkaBroker'), + self._app_config.get('KafkaPort')) + consumer = KafkaConsumer( + 'collectd', bootstrap_servers=kafka_server, + auto_offset_reset='latest', enable_auto_commit=False, + value_deserializer=lambda m: json.loads(m.decode('ascii'))) for message in consumer: for kafka_data in message.value: - self.lock() - try: - # Receive Kafka data and update cache values - self.update_cache_value(kafka_data) - - finally: - self.unlock() + # { + # u'dstypes': [u'derive'], + # u'plugin': u'cpu', + # u'dsnames': [u'value'], + # u'interval': 10.0, + # u'host': u'localhost', + # u'values': [99.9978996416267], + # u'time': 1502114956.244, + # u'plugin_instance': u'44', + # u'type_instance': u'idle', + # u'type': u'cpu' + # } + logging.debug('{}:run():data={}'.format( + self.__class__.__name__, kafka_data)) + for ds_name in kafka_data['dsnames']: + index = kafka_data['dsnames'].index(ds_name) + val_hash = CollectdValue.hash_gen( + kafka_data['host'], kafka_data['plugin'], + kafka_data['plugin_instance'], kafka_data['type'], + kafka_data['type_instance'], ds_name) + collector = self.get_collector() + val = collector.get(val_hash) + if val: + # update the value + val.value = kafka_data['values'][index] + val.time = kafka_data['time'] + del(val) + else: + # add new value into the collector + val = CollectdValue() + val.host = kafka_data['host'] + val.plugin = kafka_data['plugin'] + val.plugin_instance = kafka_data['plugin_instance'] + val.type = kafka_data['type'] + val.type_instance = kafka_data['type_instance'] + val.value = kafka_data['values'][index] + val.interval = kafka_data['interval'] + val.time = kafka_data['time'] + val.ds_name = ds_name + collector.add(val) def main(): # Parsing cmdline options parser = argparse.ArgumentParser() - parser.add_argument("--config", dest="configfile", default=None, help="Specify config file", metavar="FILE") - parser.add_argument("--loglevel", dest="level", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='WARNING', - help="Specify log level (default: %(default)s)", metavar="LEVEL") + parser.add_argument("--events-schema", dest="schema", required=True, + help="YAML events schema definition", metavar="FILE") + parser.add_argument("--config", dest="configfile", default=None, + help="Specify config file", metavar="FILE") + parser.add_argument("--loglevel", dest="level", default='INFO', + choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], + help="Specify log level (default: %(default)s)", + metavar="LEVEL") parser.add_argument("--logfile", dest="logfile", default='ves_app.log', - help="Specify log file (default: %(default)s)", metavar="FILE") + help="Specify log file (default: %(default)s)", + metavar="FILE") args = parser.parse_args() # Create log file - logging.basicConfig(filename=args.logfile, format='%(asctime)s %(message)s', level=args.level) + logging.basicConfig(filename=args.logfile, + format='%(asctime)s %(message)s', + level=args.level) + if args.configfile is None: + logging.warning("No configfile specified, using default options") # Create Application Instance application_instance = VESApp() + application_instance.init(args.configfile, args.schema) - # Read from config file - if args.configfile: - config = ConfigParser.ConfigParser() - config.optionxform = lambda option: option - config.read(args.configfile) - # Write Config Values - application_instance.config(config) - else: - logging.warning("No configfile specified, using default options") - - # Start timer for interval - application_instance.init() - - # Read Data from Kakfa try: - # Kafka consumer & update cache + # Run the plugin application_instance.run() except KeyboardInterrupt: logging.info(" - Ctrl-C handled, exiting gracefully") - application_instance.shutdown() + except Exception as e: + logging.error('{}, {}'.format(type(e), e)) + finally: + application_instance.destroy() sys.exit() - except: - logging.error("Unknown Error") if __name__ == '__main__': |