summaryrefslogtreecommitdiffstats
path: root/3rd_party/collectd-ves-app/ves_app/ves_app.py
diff options
context:
space:
mode:
Diffstat (limited to '3rd_party/collectd-ves-app/ves_app/ves_app.py')
-rw-r--r--3rd_party/collectd-ves-app/ves_app/ves_app.py939
1 files changed, 104 insertions, 835 deletions
diff --git a/3rd_party/collectd-ves-app/ves_app/ves_app.py b/3rd_party/collectd-ves-app/ves_app/ves_app.py
index 49e7635a..105c66e2 100644
--- a/3rd_party/collectd-ves-app/ves_app/ves_app.py
+++ b/3rd_party/collectd-ves-app/ves_app/ves_app.py
@@ -22,522 +22,23 @@ import argparse
from distutils.util import strtobool
from kafka import KafkaConsumer
+from normalizer import Normalizer
+from normalizer import CollectdValue
+
try:
# For Python 3.0 and later
import urllib.request as url
except ImportError:
# Fall back to Python 2's urllib2
import urllib2 as url
-import socket
-import time
-from threading import Timer
-from threading import Lock
-
-
-class Event(object):
- """Event header"""
-
- def __init__(self):
- """Construct the common header"""
- self.version = 2.0
- self.event_type = "Info" # use "Info" unless a notification is generated
- self.domain = ""
- self.event_id = ""
- self.source_id = ""
- self.source_name = ""
- self.functional_role = ""
- self.reporting_entity_id = ""
- self.reporting_entity_name = ""
- self.priority = "Normal" # will be derived from event if there is one
- self.start_epoch_microsec = 0
- self.last_epoch_micro_sec = 0
- self.sequence = 0
- self.event_name = ""
- self.internal_header_fields = {}
- self.nfc_naming_code = ""
- self.nf_naming_code = ""
-
- def get_json(self):
- """Get the object of the datatype"""
- obj = {}
- obj['version'] = self.version
- obj['eventType'] = self.event_type
- obj['domain'] = self.domain
- obj['eventId'] = self.event_id
- obj['sourceId'] = self.source_id
- obj['sourceName'] = self.source_name
- obj['reportingEntityId'] = self.reporting_entity_id
- obj['reportingEntityName'] = self.reporting_entity_name
- obj['priority'] = self.priority
- obj['startEpochMicrosec'] = self.start_epoch_microsec
- obj['lastEpochMicrosec'] = self.last_epoch_micro_sec
- obj['sequence'] = self.sequence
- obj['eventName'] = self.event_name
- obj['internalHeaderFields'] = self.internal_header_fields
- obj['nfcNamingCode'] = self.nfc_naming_code
- obj['nfNamingCode'] = self.nf_naming_code
- return json.dumps({
- 'event': {
- 'commonEventHeader': obj,
- self.get_name(): self.get_obj()
- }
- }).encode()
-
- def get_name():
- assert False, 'abstract method get_name() is not implemented'
-
- def get_obj():
- assert False, 'abstract method get_obj() is not implemented'
-
-
-class Field(object):
- """field datatype"""
-
- def __init__(self, name, value):
- self.name = name
- self.value = value
-
- def get_obj(self):
- return {
- 'name': self.name,
- 'value': self.value
- }
-
-
-class NamedArrayOfFields(object):
- """namedArrayOfFields datatype"""
-
- def __init__(self, name):
- self.name = name
- self.array_of_fields = []
-
- def add(self, field):
- self.array_of_fields.append(field.get_obj())
-
- def get_obj(self):
- return {
- 'name': self.name,
- 'arrayOfFields': self.array_of_fields
- }
-
-
-class VESDataType(object):
- """ Base VES datatype """
-
- def set_optional(self, obj, key, val):
- if val is not None:
- obj[key] = val
-
-
-class DiskUsage(VESDataType):
- """diskUsage datatype"""
- def __init__(self, identifier):
- self.disk_identifier = identifier
- self.disk_io_time_avg = None
- self.disk_io_time_last = None
- self.disk_io_time_max = None
- self.disk_io_time_min = None
- self.disk_merged_read_avg = None
- self.disk_merged_read_last = None
- self.disk_merged_read_max = None
- self.disk_merged_read_min = None
- self.disk_merged_write_avg = None
- self.disk_merged_write_last = None
- self.disk_merged_write_max = None
- self.disk_merged_write_min = None
- self.disk_octets_read_avg = None
- self.disk_octets_read_last = None
- self.disk_octets_read_max = None
- self.disk_octets_read_min = None
- self.disk_octets_write_avg = None
- self.disk_octets_write_last = None
- self.disk_octets_write_max = None
- self.disk_octets_write_min = None
- self.disk_ops_read_avg = None
- self.disk_ops_read_last = None
- self.disk_ops_read_max = None
- self.disk_ops_read_min = None
- self.disk_ops_write_avg = None
- self.disk_ops_write_last = None
- self.disk_ops_write_max = None
- self.disk_ops_write_min = None
- self.disk_pending_operations_avg = None
- self.disk_pending_operations_last = None
- self.disk_pending_operations_max = None
- self.disk_pending_operations_min = None
- self.disk_time_read_avg = None
- self.disk_time_read_last = None
- self.disk_time_read_max = None
- self.disk_time_read_min = None
- self.disk_time_write_avg = None
- self.disk_time_write_last = None
- self.disk_time_write_max = None
- self.disk_time_write_min = None
-
- def get_obj(self):
- obj = {
- # required
- 'diskIdentifier': self.disk_identifier
- }
- self.set_optional(obj, 'diskIoTimeAvg', self.disk_io_time_avg)
- self.set_optional(obj, 'diskIoTimeLast', self.disk_io_time_last)
- self.set_optional(obj, 'diskIoTimeMax', self.disk_io_time_max)
- self.set_optional(obj, 'diskIoTimeMin', self.disk_io_time_min)
- self.set_optional(obj, 'diskMergedReadAvg', self.disk_merged_read_avg)
- self.set_optional(obj, 'diskMergedReadLast', self.disk_merged_read_last)
- self.set_optional(obj, 'diskMergedReadMax', self.disk_merged_read_max)
- self.set_optional(obj, 'diskMergedReadMin', self.disk_merged_read_min)
- self.set_optional(obj, 'diskMergedWriteAvg', self.disk_merged_write_avg)
- self.set_optional(obj, 'diskMergedWriteLast', self.disk_merged_write_last)
- self.set_optional(obj, 'diskMergedWriteMax', self.disk_merged_write_max)
- self.set_optional(obj, 'diskMergedWriteMin', self.disk_merged_write_min)
- self.set_optional(obj, 'diskOctetsReadAvg', self.disk_octets_read_avg)
- self.set_optional(obj, 'diskOctetsReadLast', self.disk_octets_read_last)
- self.set_optional(obj, 'diskOctetsReadMax', self.disk_octets_read_max)
- self.set_optional(obj, 'diskOctetsReadMin', self.disk_octets_read_min)
- self.set_optional(obj, 'diskOctetsWriteAvg', self.disk_octets_write_avg)
- self.set_optional(obj, 'diskOctetsWriteLast', self.disk_octets_write_last)
- self.set_optional(obj, 'diskOctetsWriteMax', self.disk_octets_write_max)
- self.set_optional(obj, 'diskOctetsWriteMin', self.disk_octets_write_min)
- self.set_optional(obj, 'diskOpsReadAvg', self.disk_ops_read_avg)
- self.set_optional(obj, 'diskOpsReadLast', self.disk_ops_read_last)
- self.set_optional(obj, 'diskOpsReadMax', self.disk_ops_read_max)
- self.set_optional(obj, 'diskOpsReadMin', self.disk_ops_read_min)
- self.set_optional(obj, 'diskOpsWriteAvg', self.disk_ops_write_avg)
- self.set_optional(obj, 'diskOpsWriteLast', self.disk_ops_write_last)
- self.set_optional(obj, 'diskOpsWriteMax', self.disk_ops_write_max)
- self.set_optional(obj, 'diskOpsWriteMin', self.disk_ops_write_min)
- self.set_optional(obj, 'diskPendingOperationsAvg', self.disk_pending_operations_avg)
- self.set_optional(obj, 'diskPendingOperationsLast', self.disk_pending_operations_last)
- self.set_optional(obj, 'diskPendingOperationsMax', self.disk_pending_operations_max)
- self.set_optional(obj, 'diskPendingOperationsMin', self.disk_pending_operations_min)
- self.set_optional(obj, 'diskTimeReadAvg', self.disk_time_read_avg)
- self.set_optional(obj, 'diskTimeReadLast', self.disk_time_read_last)
- self.set_optional(obj, 'diskTimeReadMax', self.disk_time_read_max)
- self.set_optional(obj, 'diskTimeReadMin', self.disk_time_read_min)
- self.set_optional(obj, 'diskTimeWriteAvg', self.disk_time_write_avg)
- self.set_optional(obj, 'diskTimeWriteLast', self.disk_time_write_last)
- self.set_optional(obj, 'diskTimeWriteMax', self.disk_time_write_max)
- self.set_optional(obj, 'diskTimeWriteMin', self.disk_time_write_min)
- return obj
-
-
-class VNicPerformance(VESDataType):
- """vNicPerformance datatype"""
-
- def __init__(self, identifier):
- self.received_broadcast_packets_accumulated = None
- self.received_broadcast_packets_delta = None
- self.received_discarded_packets_accumulated = None
- self.received_discarded_packets_delta = None
- self.received_error_packets_accumulated = None
- self.received_error_packets_delta = None
- self.received_multicast_packets_accumulated = None
- self.received_multicast_packets_delta = None
- self.received_octets_accumulated = None
- self.received_octets_delta = None
- self.received_total_packets_accumulated = None
- self.received_total_packets_delta = None
- self.received_unicast_packets_accumulated = None
- self.received_unicast_packets_delta = None
- self.transmitted_broadcast_packets_accumulated = None
- self.transmitted_broadcast_packets_delta = None
- self.transmitted_discarded_packets_accumulated = None
- self.transmitted_discarded_packets_delta = None
- self.transmitted_error_packets_accumulated = None
- self.transmitted_error_packets_delta = None
- self.transmitted_multicast_packets_accumulated = None
- self.transmitted_multicast_packets_delta = None
- self.transmitted_octets_accumulated = None
- self.transmitted_octets_delta = None
- self.transmitted_total_packets_accumulated = None
- self.transmitted_total_packets_delta = None
- self.transmitted_unicast_packets_accumulated = None
- self.transmitted_unicast_packets_delta = None
- self.values_are_suspect = 'true'
- self.v_nic_identifier = identifier
-
- def get_obj(self):
- obj = {
- # required
- 'valuesAreSuspect': self.values_are_suspect,
- 'vNicIdentifier': self.v_nic_identifier
- }
- # optional
- self.set_optional(obj, 'receivedBroadcastPacketsAccumulated', self.received_broadcast_packets_accumulated)
- self.set_optional(obj, 'receivedBroadcastPacketsDelta', self.received_broadcast_packets_delta)
- self.set_optional(obj, 'receivedDiscardedPacketsAccumulated', self.received_discarded_packets_accumulated)
- self.set_optional(obj, 'receivedDiscardedPacketsDelta', self.received_discarded_packets_delta)
- self.set_optional(obj, 'receivedErrorPacketsAccumulated', self.received_error_packets_accumulated)
- self.set_optional(obj, 'receivedErrorPacketsDelta', self.received_error_packets_delta)
- self.set_optional(obj, 'receivedMulticastPacketsAccumulated', self.received_multicast_packets_accumulated)
- self.set_optional(obj, 'receivedMulticastPacketsDelta', self.received_multicast_packets_delta)
- self.set_optional(obj, 'receivedOctetsAccumulated', self.received_octets_accumulated)
- self.set_optional(obj, 'receivedOctetsDelta', self.received_octets_delta)
- self.set_optional(obj, 'receivedTotalPacketsAccumulated', self.received_total_packets_accumulated)
- self.set_optional(obj, 'receivedTotalPacketsDelta', self.received_total_packets_delta)
- self.set_optional(obj, 'receivedUnicastPacketsAccumulated', self.received_unicast_packets_accumulated)
- self.set_optional(obj, 'receivedUnicastPacketsDelta', self.received_unicast_packets_delta)
- self.set_optional(obj, 'transmittedBroadcastPacketsAccumulated', self.transmitted_broadcast_packets_accumulated)
- self.set_optional(obj, 'transmittedBroadcastPacketsDelta', self.transmitted_broadcast_packets_delta)
- self.set_optional(obj, 'transmittedDiscardedPacketsAccumulated', self.transmitted_discarded_packets_accumulated)
- self.set_optional(obj, 'transmittedDiscardedPacketsDelta', self.transmitted_discarded_packets_delta)
- self.set_optional(obj, 'transmittedErrorPacketsAccumulated', self.transmitted_error_packets_accumulated)
- self.set_optional(obj, 'transmittedErrorPacketsDelta', self.transmitted_error_packets_delta)
- self.set_optional(obj, 'transmittedMulticastPacketsAccumulated', self.transmitted_multicast_packets_accumulated)
- self.set_optional(obj, 'transmittedMulticastPacketsDelta', self.transmitted_multicast_packets_delta)
- self.set_optional(obj, 'transmittedOctetsAccumulated', self.transmitted_octets_accumulated)
- self.set_optional(obj, 'transmittedOctetsDelta', self.transmitted_octets_delta)
- self.set_optional(obj, 'transmittedTotalPacketsAccumulated', self.transmitted_total_packets_accumulated)
- self.set_optional(obj, 'transmittedTotalPacketsDelta', self.transmitted_total_packets_delta)
- self.set_optional(obj, 'transmittedUnicastPacketsAccumulated', self.transmitted_unicast_packets_accumulated)
- self.set_optional(obj, 'transmittedUnicastPacketsDelta', self.transmitted_unicast_packets_delta)
- return obj
-
-
-class CpuUsage(VESDataType):
- """cpuUsage datatype"""
-
- def __init__(self, identifier):
- self.cpu_identifier = identifier
- self.cpu_idle = None
- self.cpu_usage_interrupt = None
- self.cpu_usage_nice = None
- self.cpu_usage_soft_irq = None
- self.cpu_usage_steal = None
- self.cpu_usage_system = None
- self.cpu_usage_user = None
- self.cpu_wait = None
- self.percent_usage = 0
-
- def get_obj(self):
- obj = {
- # required
- 'cpuIdentifier': self.cpu_identifier,
- 'percentUsage': self.percent_usage
- }
- # optional
- self.set_optional(obj, 'cpuIdle', self.cpu_idle)
- self.set_optional(obj, 'cpuUsageInterrupt', self.cpu_usage_interrupt)
- self.set_optional(obj, 'cpuUsageNice', self.cpu_usage_nice)
- self.set_optional(obj, 'cpuUsageSoftIrq', self.cpu_usage_soft_irq)
- self.set_optional(obj, 'cpuUsageSteal', self.cpu_usage_steal)
- self.set_optional(obj, 'cpuUsageSystem', self.cpu_usage_system)
- self.set_optional(obj, 'cpuUsageUser', self.cpu_usage_user)
- self.set_optional(obj, 'cpuWait', self.cpu_wait)
- return obj
-
-
-class MemoryUsage(VESDataType):
- """memoryUsage datatype"""
-
- def __init__(self, identifier):
- self.memory_buffered = None
- self.memory_cached = None
- self.memory_configured = None
- self.memory_free = None
- self.memory_slab_recl = None
- self.memory_slab_unrecl = None
- self.memory_used = None
- self.vm_identifier = identifier
-
- def __str__(self):
- """ for debug purposes """
- return 'vm_identifier : {vm_identifier}\nbuffered : {buffered}\n' \
- 'cached : {cached}\nconfigured : {configured}\nfree : {free}\n' \
- 'slab_recl : {slab_recl}\nslab_unrecl : {slab_unrecl}\n' \
- 'used : {used}\n'.format(buffered=self.memory_buffered,
- cached=self.memory_cached, configured=self.memory_configured,
- free=self.memory_free, slab_recl=self.memory_slab_recl,
- slab_unrecl=self.memory_slab_unrecl, used=self.memory_used,
- vm_identifier=self.vm_identifier)
-
- def get_memory_free(self):
- if self.memory_free is None:
- # calculate the free memory
- if None not in (self.memory_configured, self.memory_used):
- return self.memory_configured - self.memory_used
- else:
- # required field, so return zero
- return 0
- else:
- return self.memory_free
-
- def get_memory_used(self):
- if self.memory_used is None:
- # calculate the memory used
- if None not in (self.memory_configured, self.memory_free, self.memory_buffered,
- self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl):
- return self.memory_configured - (self.memory_free +
- self.memory_buffered + self.memory_cached +
- self.memory_slab_recl + self.memory_slab_unrecl)
- else:
- # required field, so return zero
- return 0
- else:
- return self.memory_used
-
- def get_memory_total(self):
- if self.memory_configured is None:
- # calculate the total memory
- if None not in (self.memory_used, self.memory_free, self.memory_buffered,
- self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl):
- return (self.memory_used + self.memory_free +
- self.memory_buffered + self.memory_cached +
- self.memory_slab_recl + self.memory_slab_unrecl)
- else:
- return None
- else:
- return self.memory_configured
-
- def get_obj(self):
- obj = {
- # required fields
- 'memoryFree': self.get_memory_free(),
- 'memoryUsed': self.get_memory_used(),
- 'vmIdentifier': self.vm_identifier
- }
- # optional fields
- self.set_optional(obj, 'memoryBuffered', self.memory_buffered)
- self.set_optional(obj, 'memoryCached', self.memory_cached)
- self.set_optional(obj, 'memoryConfigured', self.memory_configured)
- self.set_optional(obj, 'memorySlabRecl', self.memory_slab_recl)
- self.set_optional(obj, 'memorySlabUnrecl', self.memory_slab_unrecl)
- return obj
-
-class MeasurementsForVfScaling(Event):
- """MeasurementsForVfScaling datatype"""
-
- def __init__(self, event_id):
- """Construct the header"""
- super(MeasurementsForVfScaling, self).__init__()
- # common attributes
- self.domain = "measurementsForVfScaling"
- self.event_type = 'hostOS'
- self.event_id = event_id
- # measurement attributes
- self.additional_measurements = []
- self.codec_usage_array = []
- self.concurrent_sessions = 0
- self.configured_entities = 0
- self.cpu_usage_array = []
- self.feature_usage_array = []
- self.filesystem_usage_array = []
- self.latency_distribution = []
- self.mean_request_latency = 0
- self.measurement_interval = 0
- self.number_of_media_ports_in_use = 0
- self.request_rate = 0
- self.vnfc_scaling_metric = 0
- self.additional_fields = []
- self.additional_objects = []
- self.disk_usage_array = []
- self.measurements_for_vf_scaling_version = 2.0
- self.memory_usage_array = []
- self.v_nic_performance_array = []
-
- def add_additional_measurement(self, named_array):
- self.additional_measurements.append(named_array.get_obj())
-
- def add_additional_fields(self, field):
- self.additional_fields.append(field.get_obj())
-
- def add_memory_usage(self, mem_usage):
- self.memory_usage_array.append(mem_usage.get_obj())
-
- def add_cpu_usage(self, cpu_usage):
- self.cpu_usage_array.append(cpu_usage.get_obj())
-
- def add_v_nic_performance(self, nic_performance):
- self.v_nic_performance_array.append(nic_performance.get_obj())
-
- def add_disk_usage(self, disk_usage):
- self.disk_usage_array.append(disk_usage.get_obj())
-
- def get_obj(self):
- """Get the object of the datatype"""
- obj = {}
- obj['additionalMeasurements'] = self.additional_measurements
- obj['codecUsageArray'] = self.codec_usage_array
- obj['concurrentSessions'] = self.concurrent_sessions
- obj['configuredEntities'] = self.configured_entities
- obj['cpuUsageArray'] = self.cpu_usage_array
- obj['featureUsageArray'] = self.feature_usage_array
- obj['filesystemUsageArray'] = self.filesystem_usage_array
- obj['latencyDistribution'] = self.latency_distribution
- obj['meanRequestLatency'] = self.mean_request_latency
- obj['measurementInterval'] = self.measurement_interval
- obj['numberOfMediaPortsInUse'] = self.number_of_media_ports_in_use
- obj['requestRate'] = self.request_rate
- obj['vnfcScalingMetric'] = self.vnfc_scaling_metric
- obj['additionalFields'] = self.additional_fields
- obj['additionalObjects'] = self.additional_objects
- obj['diskUsageArray'] = self.disk_usage_array
- obj['measurementsForVfScalingVersion'] = self.measurements_for_vf_scaling_version
- obj['memoryUsageArray'] = self.memory_usage_array
- obj['vNicPerformanceArray'] = self.v_nic_performance_array
- return obj
-
- def get_name(self):
- """Name of datatype"""
- return "measurementsForVfScalingFields"
-
-
-class Fault(Event):
- """Fault datatype"""
-
- def __init__(self, event_id):
- """Construct the header"""
- super(Fault, self).__init__()
- # common attributes
- self.domain = "fault"
- self.event_id = event_id
- self.event_type = "Fault"
- # fault attributes
- self.fault_fields_version = 1.1
- self.event_severity = 'NORMAL'
- self.event_source_type = 'other(0)'
- self.alarm_condition = ''
- self.specific_problem = ''
- self.vf_status = 'Active'
- self.alarm_interface_a = ''
- self.alarm_additional_information = []
- self.event_category = ""
-
- def get_name(self):
- """Name of datatype"""
- return 'faultFields'
-
- def get_obj(self):
- """Get the object of the datatype"""
- obj = {}
- obj['faultFieldsVersion'] = self.fault_fields_version
- obj['eventSeverity'] = self.event_severity
- obj['eventSourceType'] = self.event_source_type
- obj['alarmCondition'] = self.alarm_condition
- obj['specificProblem'] = self.specific_problem
- obj['vfStatus'] = self.vf_status
- obj['alarmInterfaceA'] = self.alarm_interface_a
- obj['alarmAdditionalInformation'] = self.alarm_additional_information
- obj['eventCategory'] = self.event_category
- return obj
-
-
-class VESApp(object):
+class VESApp(Normalizer):
"""VES Application"""
def __init__(self):
"""Application initialization"""
- self.__plugin_data_cache = {
- 'cpu': {'interval': 0.0, 'vls': []},
- 'virt': {'interval': 0.0, 'vls': []},
- 'disk': {'interval': 0.0, 'vls': []},
- 'interface': {'interval': 0.0, 'vls': []},
- 'memory': {'interval': 0.0, 'vls': []}
- }
- self.__app_config = {
+ self._app_config = {
'Domain': '127.0.0.1',
'Port': 30000,
'Path': '',
@@ -546,270 +47,56 @@ class VESApp(object):
'Topic': '',
'UseHttps': False,
'SendEventInterval': 20.0,
- 'FunctionalRole': 'Collectd VES Agent',
'ApiVersion': 5.1,
'KafkaPort': 9092,
'KafkaBroker': 'localhost'
}
- self.__host_name = None
- self.__ves_timer = None
- self.__lock = Lock()
- self.__event_id = 0
-
- def get_event_id(self):
- """get event id"""
- self.__event_id += 1
- return str(self.__event_id)
-
- def lock(self):
- """Lock the application"""
- self.__lock.acquire()
-
- def unlock(self):
- """Unlock the application"""
- self.__lock.release()
-
- def start_timer(self):
- """Start event timer"""
- self.__ves_timer = Timer(self.__app_config['SendEventInterval'], self.__on_time)
- self.__ves_timer.start()
-
- def stop_timer(self):
- """Stop event timer"""
- self.__ves_timer.cancel()
-
- def __on_time(self):
- """Timer thread"""
- self.event_timer()
- self.start_timer()
- def event_send(self, event):
+ def send_data(self, event):
"""Send event to VES"""
server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
- 's' if self.__app_config['UseHttps'] else '', self.__app_config['Domain'],
- int(self.__app_config['Port']), '{}'.format(
- '/{}'.format(self.__app_config['Path']) if (len(self.__app_config['Path']) > 0) else ''),
- int(self.__app_config['ApiVersion']), '{}'.format(
- '/{}'.format(self.__app_config['Topic']) if (len(self.__app_config['Topic']) > 0) else ''))
+ 's' if self._app_config['UseHttps'] else '',
+ self._app_config['Domain'], int(self._app_config['Port']),
+ '{}'.format('/{}'.format(self._app_config['Path']) if len(
+ self._app_config['Path']) > 0 else ''),
+ int(self._app_config['ApiVersion']), '{}'.format(
+ '/{}'.format(self._app_config['Topic']) if len(
+ self._app_config['Topic']) > 0 else ''))
logging.info('Vendor Event Listener is at: {}'.format(server_url))
credentials = base64.b64encode('{}:{}'.format(
- self.__app_config['Username'], self.__app_config['Password']).encode()).decode()
+ self._app_config['Username'],
+ self._app_config['Password']).encode()).decode()
logging.info('Authentication credentials are: {}'.format(credentials))
try:
request = url.Request(server_url)
request.add_header('Authorization', 'Basic {}'.format(credentials))
request.add_header('Content-Type', 'application/json')
- logging.debug("Sending {} to {}".format(event.get_json(), server_url))
- vel = url.urlopen(request, event.get_json(), timeout=1)
+ event_str = json.dumps(event).encode()
+ logging.debug("Sending {} to {}".format(event_str, server_url))
+ url.urlopen(request, event_str, timeout=1)
logging.debug("Sent data to {} successfully".format(server_url))
except url.HTTPError as e:
logging.error('Vendor Event Listener exception: {}'.format(e))
except url.URLError as e:
- logging.error('Vendor Event Listener is is not reachable: {}'.format(e))
+ logging.error(
+ 'Vendor Event Listener is is not reachable: {}'.format(e))
except Exception as e:
logging.error('Vendor Event Listener error: {}'.format(e))
- def bytes_to_kb(self, bytes):
- """Convert bytes to kibibytes"""
- return round((bytes / 1024.0), 3)
-
- def get_hostname(self):
- return socket.gethostname()
-
- def send_host_measurements(self):
- # get list of all VMs
- virt_vcpu_total = self.cache_get_value(plugin_name='virt', type_name='virt_cpu_total',
- mark_as_read=False)
- vm_names = [x['plugin_instance'] for x in virt_vcpu_total]
- for vm_name in vm_names:
- # make sure that 'virt' plugin cache is up-to-date
- vm_values = self.cache_get_value(plugin_name='virt', plugin_instance=vm_name,
- mark_as_read=False)
- us_up_to_date = True
- for vm_value in vm_values:
- if vm_value['updated'] == False:
- logging.warning("%s" % vm_value)
- us_up_to_date = False
- break
- if not us_up_to_date:
- # one of the cache value is not up-to-date, break
- logging.warning("virt collectd cache values are not up-to-date for {}".format(vm_name))
- continue
- # values are up-to-date, create an event message
- measurement = MeasurementsForVfScaling(self.get_event_id())
- measurement.functional_role = self.__app_config['FunctionalRole']
- # fill out reporting_entity
- measurement.reporting_entity_id = self.get_hostname()
- measurement.reporting_entity_name = measurement.reporting_entity_id
- # set source as a host value
- measurement.source_id = vm_name
- measurement.source_name = measurement.source_id
- # fill out EpochMicrosec (convert to us)
- measurement.start_epoch_microsec = (virt_vcpu_total[0]['time'] * 1000000)
- # plugin interval
- measurement.measurement_interval = self.__plugin_data_cache['virt']['interval']
- # memoryUsage
- mem_usage = MemoryUsage(vm_name)
- memory_total = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='memory', type_instance='total')
- memory_unused = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='memory', type_instance='unused')
- memory_rss = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='memory', type_instance='rss')
- if len(memory_total) > 0:
- mem_usage.memory_configured = self.bytes_to_kb(memory_total[0]['values'][0])
- if len(memory_unused) > 0:
- mem_usage.memory_free = self.bytes_to_kb(memory_unused[0]['values'][0])
- elif len(memory_rss) > 0:
- mem_usage.memory_free = self.bytes_to_kb(memory_rss[0]['values'][0])
- # since, "used" metric is not provided by virt plugn, set the rest of the memory stats
- # to zero to calculate used based on provided stats only
- mem_usage.memory_buffered = mem_usage.memory_cached = mem_usage.memory_slab_recl = \
- mem_usage.memory_slab_unrecl = 0
- measurement.add_memory_usage(mem_usage)
- # cpuUsage
- virt_vcpus = self.cache_get_value(plugin_instance=vm_name,
- plugin_name='virt', type_name='virt_vcpu')
- for virt_vcpu in virt_vcpus:
- cpu_usage = CpuUsage(virt_vcpu['type_instance'])
- cpu_usage.percent_usage = self.cpu_ns_to_percentage(virt_vcpu)
- measurement.add_cpu_usage(cpu_usage)
- # vNicPerformance
- if_packets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='if_packets', mark_as_read=False)
- if_names = [x['type_instance'] for x in if_packets]
- for if_name in if_names:
- if_packets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='if_packets', type_instance=if_name)
- if_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='if_octets', type_instance=if_name)
- if_errors = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='if_errors', type_instance=if_name)
- if_dropped = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='if_dropped', type_instance=if_name)
- v_nic_performance = VNicPerformance(if_name)
- v_nic_performance.received_total_packets_accumulated = if_packets[0]['values'][0]
- v_nic_performance.transmitted_total_packets_accumulated = if_packets[0]['values'][1]
- v_nic_performance.received_octets_accumulated = if_octets[0]['values'][0]
- v_nic_performance.transmitted_octets_accumulated = if_octets[0]['values'][1]
- v_nic_performance.received_error_packets_accumulated = if_errors[0]['values'][0]
- v_nic_performance.transmitted_error_packets_accumulated = if_errors[0]['values'][1]
- v_nic_performance.received_discarded_packets_accumulated = if_dropped[0]['values'][0]
- v_nic_performance.transmitted_discarded_packets_accumulated = if_dropped[0]['values'][1]
- measurement.add_v_nic_performance(v_nic_performance)
- # diskUsage
- disk_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='disk_octets', mark_as_read=False)
- disk_names = [x['type_instance'] for x in disk_octets]
- for disk_name in disk_names:
- disk_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='disk_octets', type_instance=disk_name)
- disk_ops = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='disk_ops', type_instance=disk_name)
- disk_usage = DiskUsage(disk_name)
- disk_usage.disk_octets_read_last = disk_octets[0]['values'][0]
- disk_usage.disk_octets_write_last = disk_octets[0]['values'][1]
- disk_usage.disk_ops_read_last = disk_ops[0]['values'][0]
- disk_usage.disk_ops_write_last = disk_ops[0]['values'][1]
- measurement.add_disk_usage(disk_usage)
- # add additional measurements (perf)
- perf_values = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='perf')
- named_array = NamedArrayOfFields('perf')
- for perf in perf_values:
- named_array.add(Field(perf['type_instance'], str(perf['values'][0])))
- measurement.add_additional_measurement(named_array)
- # add host values as additional measurements
- self.set_additional_fields(measurement, exclude_plugins=['virt'])
- # send event to the VES
- self.event_send(measurement)
- if len(vm_names) > 0:
- # mark the additional measurements metrics as read
- self.mark_cache_values_as_read(exclude_plugins=['virt'])
-
- def event_timer(self):
- """Event timer thread"""
- self.lock()
- try:
- self.send_host_measurements()
- finally:
- self.unlock()
-
- def mark_cache_values_as_read(self, exclude_plugins=None):
- """mark the cache values as read"""
- for plugin_name in self.__plugin_data_cache.keys():
- if (exclude_plugins != None and plugin_name in exclude_plugins):
- # skip excluded plugins
- continue;
- for val in self.__plugin_data_cache[plugin_name]['vls']:
- val['updated'] = False
-
- def set_additional_measurements(self, measurement, exclude_plugins=None):
- """Set addition measurement filed with host/guets values"""
- # add host/guest values as additional measurements
- for plugin_name in self.__plugin_data_cache.keys():
- if (exclude_plugins != None and plugin_name in exclude_plugins):
- # skip excluded plugins
- continue;
- for val in self.__plugin_data_cache[plugin_name]['vls']:
- if val['updated']:
- array_name = self.make_dash_string(plugin_name, val['plugin_instance'],
- val['type_instance'])
- named_array = NamedArrayOfFields(array_name)
-
- for index in range(len(val['dsnames'])):
- mname = '{}-{}'.format(val['type'], val['dsnames'][index])
- named_array.add(Field(mname, str(val['values'][index])))
- measurement.add_additional_measurement(named_array);
- val['updated'] = False
-
- def set_additional_fields(self, measurement, exclude_plugins=None):
- # set host values as additional fields
- for plugin_name in self.__plugin_data_cache.keys():
- if (exclude_plugins != None and plugin_name in exclude_plugins):
- # skip excluded plugins
- continue;
- for val in self.__plugin_data_cache[plugin_name]['vls']:
- if val['updated']:
- name_prefix = self.make_dash_string(plugin_name, val['plugin_instance'],
- val['type_instance'])
-
- for index in range(len(val['dsnames'])):
- field_name = self.make_dash_string(name_prefix, val['type'], val['dsnames'][index])
- measurement.add_additional_fields(Field(field_name, str(val['values'][index])))
-
- def cpu_ns_to_percentage(self, vl):
- """Convert CPU usage ns to CPU %"""
- total = vl['values'][0]
- total_time = vl['time']
- pre_total = vl['pre_values'][0]
- pre_total_time = vl['pre_time']
- if (total_time - pre_total_time) == 0:
- # return zero usage if time diff is zero
- return 0.0
- percent = (100.0 * (total - pre_total)) / ((total_time - pre_total_time) * 1000000000.0)
- logging.debug("pre_time={}, pre_value={}, time={}, value={}, cpu={}%".format(
- pre_total_time, pre_total, total_time, total, round(percent, 2)))
- return round(percent, 2)
-
- def make_dash_string(self, *args):
- """Join non empty strings with dash symbol"""
- return '-'.join(filter(lambda x: len(x) > 0, args))
-
def config(self, config):
"""VES option configuration"""
for key, value in config.items('config'):
- if key in self.__app_config:
+ if key in self._app_config:
try:
- if type(self.__app_config[key]) == int:
+ if type(self._app_config[key]) == int:
value = int(value)
- elif type(self.__app_config[key]) == float:
+ elif type(self._app_config[key]) == float:
value = float(value)
- elif type(self.__app_config[key]) == bool:
- value = bool(distutils.util.strtobool(value))
+ elif type(self._app_config[key]) == bool:
+ value = bool(strtobool(value))
- if isinstance(value, type(self.__app_config[key])):
- self.__app_config[key] = value
+ if isinstance(value, type(self._app_config[key])):
+ self._app_config[key] = value
else:
logging.error("Type mismatch with %s" % key)
sys.exit()
@@ -820,125 +107,107 @@ class VESApp(object):
logging.error("Incorrect key configuration %s" % key)
sys.exit()
- def init(self):
- """Initialisation"""
- # start the VES timer
- self.start_timer()
-
- def update_cache_value(self, kafka_data):
- """Update value internal collectd cache values or create new one
- Please note, the cache should be locked before using this function"""
- found = False
- if kafka_data['plugin'] not in self.__plugin_data_cache:
- self.__plugin_data_cache[kafka_data['plugin']] = {'vls': []}
- plugin_vl = self.__plugin_data_cache[kafka_data['plugin']]['vls']
-
- for index in range(len(plugin_vl)):
- # record found, so just update time the values
- if (plugin_vl[index]['plugin_instance'] == kafka_data['plugin_instance']) and \
- (plugin_vl[index]['type_instance'] == kafka_data['type_instance']) and \
- (plugin_vl[index]['type'] == kafka_data['type']):
- plugin_vl[index]['pre_time'] = plugin_vl[index]['time']
- plugin_vl[index]['time'] = kafka_data['time']
- plugin_vl[index]['pre_values'] = plugin_vl[index]['values']
- plugin_vl[index]['values'] = kafka_data['values']
- plugin_vl[index]['dsnames'] = kafka_data['dsnames']
- plugin_vl[index]['updated'] = True
- found = True
- break
- if not found:
- value = {}
- # create new cache record
- value['plugin_instance'] = kafka_data['plugin_instance']
- value['type_instance'] = kafka_data['type_instance']
- value['values'] = kafka_data['values']
- value['pre_values'] = kafka_data['values']
- value['type'] = kafka_data['type']
- value['time'] = kafka_data['time']
- value['pre_time'] = kafka_data['time']
- value['host'] = kafka_data['host']
- value['dsnames'] = kafka_data['dsnames']
- value['updated'] = True
- self.__plugin_data_cache[kafka_data['plugin']]['vls'].append(value)
- # update plugin interval based on one received in the value
- self.__plugin_data_cache[kafka_data['plugin']]['interval'] = kafka_data['interval']
-
- def cache_get_value(self, plugin_name=None, plugin_instance=None,
- type_name=None, type_instance=None, type_names=None, mark_as_read=True):
- """Get cache value by given criteria"""
- ret_list = []
- if plugin_name in self.__plugin_data_cache:
- for val in self.__plugin_data_cache[plugin_name]['vls']:
- if (type_name == None or type_name == val['type']) and \
- (plugin_instance == None or plugin_instance == val['plugin_instance']) and \
- (type_instance == None or type_instance == val['type_instance']) and \
- (type_names == None or val['type'] in type_names):
- if mark_as_read:
- val['updated'] = False
- ret_list.append(val)
- return ret_list
-
- def shutdown(self):
- """Shutdown method for clean up"""
- self.stop_timer()
+ def init(self, configfile, schema_file):
+ if configfile is not None:
+ # read VES configuration file if provided
+ config = ConfigParser.ConfigParser()
+ config.optionxform = lambda option: option
+ config.read(configfile)
+ self.config(config)
+ # initialize normalizer
+ self.initialize(schema_file, self._app_config['SendEventInterval'])
def run(self):
"""Consumer JSON data from kafka broker"""
- kafka_server = '%s:%s' % (self.__app_config.get('KafkaBroker'), self.__app_config.get('KafkaPort'))
- consumer = KafkaConsumer('collectd', bootstrap_servers=kafka_server, auto_offset_reset='latest',
- enable_auto_commit=False,
- value_deserializer=lambda m: json.loads(m.decode('ascii')))
+ kafka_server = '{}:{}'.format(
+ self._app_config.get('KafkaBroker'),
+ self._app_config.get('KafkaPort'))
+ consumer = KafkaConsumer(
+ 'collectd', bootstrap_servers=kafka_server,
+ auto_offset_reset='latest', enable_auto_commit=False,
+ value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
for kafka_data in message.value:
- self.lock()
- try:
- # Receive Kafka data and update cache values
- self.update_cache_value(kafka_data)
-
- finally:
- self.unlock()
+ # {
+ # u'dstypes': [u'derive'],
+ # u'plugin': u'cpu',
+ # u'dsnames': [u'value'],
+ # u'interval': 10.0,
+ # u'host': u'localhost',
+ # u'values': [99.9978996416267],
+ # u'time': 1502114956.244,
+ # u'plugin_instance': u'44',
+ # u'type_instance': u'idle',
+ # u'type': u'cpu'
+ # }
+ logging.debug('{}:run():data={}'.format(
+ self.__class__.__name__, kafka_data))
+ for ds_name in kafka_data['dsnames']:
+ index = kafka_data['dsnames'].index(ds_name)
+ val_hash = CollectdValue.hash_gen(
+ kafka_data['host'], kafka_data['plugin'],
+ kafka_data['plugin_instance'], kafka_data['type'],
+ kafka_data['type_instance'], ds_name)
+ collector = self.get_collector()
+ val = collector.get(val_hash)
+ if val:
+ # update the value
+ val.value = kafka_data['values'][index]
+ val.time = kafka_data['time']
+ del(val)
+ else:
+ # add new value into the collector
+ val = CollectdValue()
+ val.host = kafka_data['host']
+ val.plugin = kafka_data['plugin']
+ val.plugin_instance = kafka_data['plugin_instance']
+ val.type = kafka_data['type']
+ val.type_instance = kafka_data['type_instance']
+ val.value = kafka_data['values'][index]
+ val.interval = kafka_data['interval']
+ val.time = kafka_data['time']
+ val.ds_name = ds_name
+ collector.add(val)
def main():
# Parsing cmdline options
parser = argparse.ArgumentParser()
- parser.add_argument("--config", dest="configfile", default=None, help="Specify config file", metavar="FILE")
- parser.add_argument("--loglevel", dest="level", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='WARNING',
- help="Specify log level (default: %(default)s)", metavar="LEVEL")
+ parser.add_argument("--events-schema", dest="schema", required=True,
+ help="YAML events schema definition", metavar="FILE")
+ parser.add_argument("--config", dest="configfile", default=None,
+ help="Specify config file", metavar="FILE")
+ parser.add_argument("--loglevel", dest="level", default='INFO',
+ choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
+ help="Specify log level (default: %(default)s)",
+ metavar="LEVEL")
parser.add_argument("--logfile", dest="logfile", default='ves_app.log',
- help="Specify log file (default: %(default)s)", metavar="FILE")
+ help="Specify log file (default: %(default)s)",
+ metavar="FILE")
args = parser.parse_args()
# Create log file
- logging.basicConfig(filename=args.logfile, format='%(asctime)s %(message)s', level=args.level)
+ logging.basicConfig(filename=args.logfile,
+ format='%(asctime)s %(message)s',
+ level=args.level)
+ if args.configfile is None:
+ logging.warning("No configfile specified, using default options")
# Create Application Instance
application_instance = VESApp()
+ application_instance.init(args.configfile, args.schema)
- # Read from config file
- if args.configfile:
- config = ConfigParser.ConfigParser()
- config.optionxform = lambda option: option
- config.read(args.configfile)
- # Write Config Values
- application_instance.config(config)
- else:
- logging.warning("No configfile specified, using default options")
-
- # Start timer for interval
- application_instance.init()
-
- # Read Data from Kakfa
try:
- # Kafka consumer & update cache
+ # Run the plugin
application_instance.run()
except KeyboardInterrupt:
logging.info(" - Ctrl-C handled, exiting gracefully")
- application_instance.shutdown()
+ except Exception as e:
+ logging.error('{}, {}'.format(type(e), e))
+ finally:
+ application_instance.destroy()
sys.exit()
- except:
- logging.error("Unknown Error")
if __name__ == '__main__':