diff options
-rw-r--r-- | 3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py | 699 | ||||
-rwxr-xr-x | 3rd_party/ovs_pmd_stats/ovs_pmd_stats.py | 109 | ||||
-rw-r--r-- | baro_tests/barometer.py | 24 | ||||
-rwxr-xr-x | ci/utility/collectd_build_rpm.sh | 6 |
4 files changed, 634 insertions, 204 deletions
diff --git a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py b/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py index 3f0a069f..ee27bcdf 100644 --- a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py +++ b/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py @@ -40,7 +40,7 @@ class Event(object): def __init__(self): """Construct the common header""" - self.version = 1.1 + self.version = 2.0 self.event_type = "Info" # use "Info" unless a notification is generated self.domain = "" self.event_id = "" @@ -53,6 +53,10 @@ class Event(object): self.start_epoch_microsec = 0 self.last_epoch_micro_sec = 0 self.sequence = 0 + self.event_name = "" + self.internal_header_fields = {} + self.nfc_naming_code = "" + self.nf_naming_code = "" def get_json(self): """Get the object of the datatype""" @@ -63,13 +67,16 @@ class Event(object): obj['eventId'] = self.event_id obj['sourceId'] = self.source_id obj['sourceName'] = self.source_name - obj['functionalRole'] = self.functional_role obj['reportingEntityId'] = self.reporting_entity_id obj['reportingEntityName'] = self.reporting_entity_name obj['priority'] = self.priority obj['startEpochMicrosec'] = self.start_epoch_microsec obj['lastEpochMicrosec'] = self.last_epoch_micro_sec obj['sequence'] = self.sequence + obj['eventName'] = self.event_name + obj['internalHeaderFields'] = self.internal_header_fields + obj['nfcNamingCode'] = self.nfc_naming_code + obj['nfNamingCode'] = self.nf_naming_code return json.dumps({ 'event' : { 'commonEventHeader' : obj, @@ -83,25 +90,315 @@ class Event(object): def get_obj(): assert False, 'abstract method get_obj() is not implemented' -class MeasurementGroup(object): - """MeasurementGroup datatype""" +class Field(object): + """field datatype""" + + def __init__(self, name, value): + self.name = name + self.value = value + + def get_obj(self): + return { + 'name' : self.name, + 'value' : self.value + } + +class NamedArrayOfFields(object): + """namedArrayOfFields datatype""" def __init__(self, name): self.name = name - self.measurement = [] - pass + self.array_of_fields = [] - def add_measurement(self, name, value): - self.measurement.append({ - 'name' : name, - 'value' : value - }) + def add(self, field): + self.array_of_fields.append(field.get_obj()) def get_obj(self): return { 'name' : self.name, - 'measurements' : self.measurement + 'arrayOfFields' : self.array_of_fields + } + +class VESDataType(object): + """ Base VES datatype """ + + def set_optional(self, obj, key, val): + if val is not None: + obj[key] = val + +class DiskUsage(VESDataType): + """diskUsage datatype""" + + def __init__(self, identifier): + self.disk_identifier = identifier + self.disk_io_time_avg = None + self.disk_io_time_last = None + self.disk_io_time_max = None + self.disk_io_time_min = None + self.disk_merged_read_avg = None + self.disk_merged_read_last = None + self.disk_merged_read_max = None + self.disk_merged_read_min = None + self.disk_merged_write_avg = None + self.disk_merged_write_last = None + self.disk_merged_write_max = None + self.disk_merged_write_min = None + self.disk_octets_read_avg = None + self.disk_octets_read_last = None + self.disk_octets_read_max = None + self.disk_octets_read_min = None + self.disk_octets_write_avg = None + self.disk_octets_write_last = None + self.disk_octets_write_max = None + self.disk_octets_write_min = None + self.disk_ops_read_avg = None + self.disk_ops_read_last = None + self.disk_ops_read_max = None + self.disk_ops_read_min = None + self.disk_ops_write_avg = None + self.disk_ops_write_last = None + self.disk_ops_write_max = None + self.disk_ops_write_min = None + self.disk_pending_operations_avg = None + self.disk_pending_operations_last = None + self.disk_pending_operations_max = None + self.disk_pending_operations_min = None + self.disk_time_read_avg = None + self.disk_time_read_last = None + self.disk_time_read_max = None + self.disk_time_read_min = None + self.disk_time_write_avg = None + self.disk_time_write_last = None + self.disk_time_write_max = None + self.disk_time_write_min = None + + def get_obj(self): + obj = { + # required + 'diskIdentifier' : self.disk_identifier + } + self.set_optional(obj, 'diskIoTimeAvg', self.disk_io_time_avg) + self.set_optional(obj, 'diskIoTimeLast', self.disk_io_time_last) + self.set_optional(obj, 'diskIoTimeMax', self.disk_io_time_max) + self.set_optional(obj, 'diskIoTimeMin', self.disk_io_time_min) + self.set_optional(obj, 'diskMergedReadAvg', self.disk_merged_read_avg) + self.set_optional(obj, 'diskMergedReadLast', self.disk_merged_read_last) + self.set_optional(obj, 'diskMergedReadMax', self.disk_merged_read_max) + self.set_optional(obj, 'diskMergedReadMin', self.disk_merged_read_min) + self.set_optional(obj, 'diskMergedWriteAvg', self.disk_merged_write_avg) + self.set_optional(obj, 'diskMergedWriteLast', self.disk_merged_write_last) + self.set_optional(obj, 'diskMergedWriteMax', self.disk_merged_write_max) + self.set_optional(obj, 'diskMergedWriteMin', self.disk_merged_write_min) + self.set_optional(obj, 'diskOctetsReadAvg', self.disk_octets_read_avg) + self.set_optional(obj, 'diskOctetsReadLast', self.disk_octets_read_last) + self.set_optional(obj, 'diskOctetsReadMax', self.disk_octets_read_max) + self.set_optional(obj, 'diskOctetsReadMin', self.disk_octets_read_min) + self.set_optional(obj, 'diskOctetsWriteAvg', self.disk_octets_write_avg) + self.set_optional(obj, 'diskOctetsWriteLast', self.disk_octets_write_last) + self.set_optional(obj, 'diskOctetsWriteMax', self.disk_octets_write_max) + self.set_optional(obj, 'diskOctetsWriteMin', self.disk_octets_write_min) + self.set_optional(obj, 'diskOpsReadAvg', self.disk_ops_read_avg) + self.set_optional(obj, 'diskOpsReadLast', self.disk_ops_read_last) + self.set_optional(obj, 'diskOpsReadMax', self.disk_ops_read_max) + self.set_optional(obj, 'diskOpsReadMin', self.disk_ops_read_min) + self.set_optional(obj, 'diskOpsWriteAvg', self.disk_ops_write_avg) + self.set_optional(obj, 'diskOpsWriteLast', self.disk_ops_write_last) + self.set_optional(obj, 'diskOpsWriteMax', self.disk_ops_write_max) + self.set_optional(obj, 'diskOpsWriteMin', self.disk_ops_write_min) + self.set_optional(obj, 'diskPendingOperationsAvg', self.disk_pending_operations_avg) + self.set_optional(obj, 'diskPendingOperationsLast', self.disk_pending_operations_last) + self.set_optional(obj, 'diskPendingOperationsMax', self.disk_pending_operations_max) + self.set_optional(obj, 'diskPendingOperationsMin', self.disk_pending_operations_min) + self.set_optional(obj, 'diskTimeReadAvg', self.disk_time_read_avg) + self.set_optional(obj, 'diskTimeReadLast', self.disk_time_read_last) + self.set_optional(obj, 'diskTimeReadMax', self.disk_time_read_max) + self.set_optional(obj, 'diskTimeReadMin', self.disk_time_read_min) + self.set_optional(obj, 'diskTimeWriteAvg', self.disk_time_write_avg) + self.set_optional(obj, 'diskTimeWriteLast', self.disk_time_write_last) + self.set_optional(obj, 'diskTimeWriteMax', self.disk_time_write_max) + self.set_optional(obj, 'diskTimeWriteMin', self.disk_time_write_min) + return obj + +class VNicPerformance(VESDataType): + """vNicPerformance datatype""" + + def __init__(self, identifier): + self.received_broadcast_packets_accumulated = None + self.received_broadcast_packets_delta = None + self.received_discarded_packets_accumulated = None + self.received_discarded_packets_delta = None + self.received_error_packets_accumulated = None + self.received_error_packets_delta = None + self.received_multicast_packets_accumulated = None + self.received_multicast_packets_delta = None + self.received_octets_accumulated = None + self.received_octets_delta = None + self.received_total_packets_accumulated = None + self.received_total_packets_delta = None + self.received_unicast_packets_accumulated = None + self.received_unicast_packets_delta = None + self.transmitted_broadcast_packets_accumulated = None + self.transmitted_broadcast_packets_delta = None + self.transmitted_discarded_packets_accumulated = None + self.transmitted_discarded_packets_delta = None + self.transmitted_error_packets_accumulated = None + self.transmitted_error_packets_delta = None + self.transmitted_multicast_packets_accumulated = None + self.transmitted_multicast_packets_delta = None + self.transmitted_octets_accumulated = None + self.transmitted_octets_delta = None + self.transmitted_total_packets_accumulated = None + self.transmitted_total_packets_delta = None + self.transmitted_unicast_packets_accumulated = None + self.transmitted_unicast_packets_delta = None + self.values_are_suspect = 'true' + self.v_nic_identifier = identifier + + def get_obj(self): + obj = { + # required + 'valuesAreSuspect' : self.values_are_suspect, + 'vNicIdentifier' : self.v_nic_identifier + } + # optional + self.set_optional(obj, 'receivedBroadcastPacketsAccumulated', self.received_broadcast_packets_accumulated) + self.set_optional(obj, 'receivedBroadcastPacketsDelta', self.received_broadcast_packets_delta) + self.set_optional(obj, 'receivedDiscardedPacketsAccumulated', self.received_discarded_packets_accumulated) + self.set_optional(obj, 'receivedDiscardedPacketsDelta', self.received_discarded_packets_delta) + self.set_optional(obj, 'receivedErrorPacketsAccumulated', self.received_error_packets_accumulated) + self.set_optional(obj, 'receivedErrorPacketsDelta', self.received_error_packets_delta) + self.set_optional(obj, 'receivedMulticastPacketsAccumulated', self.received_multicast_packets_accumulated) + self.set_optional(obj, 'receivedMulticastPacketsDelta', self.received_multicast_packets_delta) + self.set_optional(obj, 'receivedOctetsAccumulated', self.received_octets_accumulated) + self.set_optional(obj, 'receivedOctetsDelta', self.received_octets_delta) + self.set_optional(obj, 'receivedTotalPacketsAccumulated', self.received_total_packets_accumulated) + self.set_optional(obj, 'receivedTotalPacketsDelta', self.received_total_packets_delta) + self.set_optional(obj, 'receivedUnicastPacketsAccumulated', self.received_unicast_packets_accumulated) + self.set_optional(obj, 'receivedUnicastPacketsDelta', self.received_unicast_packets_delta) + self.set_optional(obj, 'transmittedBroadcastPacketsAccumulated', self.transmitted_broadcast_packets_accumulated) + self.set_optional(obj, 'transmittedBroadcastPacketsDelta', self.transmitted_broadcast_packets_delta) + self.set_optional(obj, 'transmittedDiscardedPacketsAccumulated', self.transmitted_discarded_packets_accumulated) + self.set_optional(obj, 'transmittedDiscardedPacketsDelta', self.transmitted_discarded_packets_delta) + self.set_optional(obj, 'transmittedErrorPacketsAccumulated', self.transmitted_error_packets_accumulated) + self.set_optional(obj, 'transmittedErrorPacketsDelta', self.transmitted_error_packets_delta) + self.set_optional(obj, 'transmittedMulticastPacketsAccumulated', self.transmitted_multicast_packets_accumulated) + self.set_optional(obj, 'transmittedMulticastPacketsDelta', self.transmitted_multicast_packets_delta) + self.set_optional(obj, 'transmittedOctetsAccumulated', self.transmitted_octets_accumulated) + self.set_optional(obj, 'transmittedOctetsDelta', self.transmitted_octets_delta) + self.set_optional(obj, 'transmittedTotalPacketsAccumulated', self.transmitted_total_packets_accumulated) + self.set_optional(obj, 'transmittedTotalPacketsDelta', self.transmitted_total_packets_delta) + self.set_optional(obj, 'transmittedUnicastPacketsAccumulated', self.transmitted_unicast_packets_accumulated) + self.set_optional(obj, 'transmittedUnicastPacketsDelta', self.transmitted_unicast_packets_delta) + return obj + +class CpuUsage(VESDataType): + """cpuUsage datatype""" + + def __init__(self, identifier): + self.cpu_identifier = identifier + self.cpu_idle = None + self.cpu_usage_interrupt = None + self.cpu_usage_nice = None + self.cpu_usage_soft_irq = None + self.cpu_usage_steal = None + self.cpu_usage_system = None + self.cpu_usage_user = None + self.cpu_wait = None + self.percent_usage = 0 + + def get_obj(self): + obj = { + # required + 'cpuIdentifier' : self.cpu_identifier, + 'percentUsage' : self.percent_usage + } + # optional + self.set_optional(obj, 'cpuIdle', self.cpu_idle) + self.set_optional(obj, 'cpuUsageInterrupt', self.cpu_usage_interrupt) + self.set_optional(obj, 'cpuUsageNice', self.cpu_usage_nice) + self.set_optional(obj, 'cpuUsageSoftIrq', self.cpu_usage_soft_irq) + self.set_optional(obj, 'cpuUsageSteal', self.cpu_usage_steal) + self.set_optional(obj, 'cpuUsageSystem', self.cpu_usage_system) + self.set_optional(obj, 'cpuUsageUser', self.cpu_usage_user) + self.set_optional(obj, 'cpuWait', self.cpu_wait) + return obj + +class MemoryUsage(VESDataType): + """memoryUsage datatype""" + + def __init__(self, identifier): + self.memory_buffered = None + self.memory_cached = None + self.memory_configured = None + self.memory_free = None + self.memory_slab_recl = None + self.memory_slab_unrecl = None + self.memory_used = None + self.vm_identifier = identifier + + def __str__(self): + """ for debug purposes """ + return 'vm_identifier : {vm_identifier}\nbuffered : {buffered}\n'\ + 'cached : {cached}\nconfigured : {configured}\nfree : {free}\n'\ + 'slab_recl : {slab_recl}\nslab_unrecl : {slab_unrecl}\n'\ + 'used : {used}\n'.format(buffered = self.memory_buffered, + cached = self.memory_cached, configured = self.memory_configured, + free = self.memory_free, slab_recl = self.memory_slab_recl, + slab_unrecl = self.memory_slab_unrecl, used = self.memory_used, + vm_identifier = self.vm_identifier) + + def get_memory_free(self): + if self.memory_free is None: + # calculate the free memory + if None not in (self.memory_configured, self.memory_used): + return self.memory_configured - self.memory_used + else: + # required field, so return zero + return 0 + else: + return self.memory_free + + def get_memory_used(self): + if self.memory_used is None: + # calculate the memory used + if None not in (self.memory_configured, self.memory_free, self.memory_buffered, + self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl): + return self.memory_configured - (self.memory_free + + self.memory_buffered + self.memory_cached + + self.memory_slab_recl + self.memory_slab_unrecl) + else: + # required field, so return zero + return 0 + else: + return self.memory_used + + def get_memory_total(self): + if self.memory_configured is None: + # calculate the total memory + if None not in (self.memory_used, self.memory_free, self.memory_buffered, + self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl): + return (self.memory_used + self.memory_free + + self.memory_buffered + self.memory_cached + + self.memory_slab_recl + self.memory_slab_unrecl) + else: + return None + else: + return self.memory_configured + + def get_obj(self): + obj = { + # required fields + 'memoryFree' : self.get_memory_free(), + 'memoryUsed' : self.get_memory_used(), + 'vmIdentifier' : self.vm_identifier } + # optional fields + self.set_optional(obj, 'memoryBuffered', self.memory_buffered) + self.set_optional(obj, 'memoryCached', self.memory_cached) + self.set_optional(obj, 'memoryConfigured', self.memory_configured) + self.set_optional(obj, 'memorySlabRecl', self.memory_slab_recl) + self.set_optional(obj, 'memorySlabUnrecl', self.memory_slab_unrecl) + return obj class MeasurementsForVfScaling(Event): """MeasurementsForVfScaling datatype""" @@ -111,79 +408,69 @@ class MeasurementsForVfScaling(Event): super(MeasurementsForVfScaling, self).__init__() # common attributes self.domain = "measurementsForVfScaling" + self.event_type = 'hostOS' self.event_id = event_id # measurement attributes self.additional_measurements = [] - self.aggregate_cpu_usage = 0 self.codec_usage_array = [] self.concurrent_sessions = 0 self.configured_entities = 0 self.cpu_usage_array = [] - self.errors = { - "receiveDiscards" : 0, - "receiveErrors" : 0, - "transmitDiscards" : 0, - "transmitErrors" : 0 - } self.feature_usage_array = [] self.filesystem_usage_array = [] self.latency_distribution = [] self.mean_request_latency = 0 - self.measurement_fields_version = 1.1 self.measurement_interval = 0 - self.memory_configured = 0 - self.memory_used = 0 self.number_of_media_ports_in_use = 0 self.request_rate = 0 self.vnfc_scaling_metric = 0 - self.v_nic_usage_array = [] - - def add_measurement_group(self, group): - self.additional_measurements.append(group.get_obj()) - - def add_cpu_usage(self, cpu_identifier, usage): - self.cpu_usage_array.append({ - 'cpuIdentifier' : cpu_identifier, - 'percentUsage' : usage - }) - - def add_v_nic_usage(self, if_name, if_pkts, if_bytes): - self.v_nic_usage_array.append({ - 'broadcastPacketsIn' : 0.0, - 'broadcastPacketsOut' : 0.0, - 'multicastPacketsIn' : 0.0, - 'multicastPacketsOut' : 0.0, - 'unicastPacketsIn' : 0.0, - 'unicastPacketsOut' : 0.0, - 'vNicIdentifier' : if_name, - 'packetsIn' : if_pkts[0], - 'packetsOut' : if_pkts[1], - 'bytesIn' : if_bytes[0], - 'bytesOut' : if_bytes[1] - }) + self.additional_fields = [] + self.additional_objects = [] + self.disk_usage_array = [] + self.measurements_for_vf_scaling_version = 2.0 + self.memory_usage_array = [] + self.v_nic_performance_array = [] + + def add_additional_measurement(self, named_array): + self.additional_measurements.append(named_array.get_obj()) + + def add_additional_fields(self, field): + self.additional_fields.append(field.get_obj()) + + def add_memory_usage(self, mem_usage): + self.memory_usage_array.append(mem_usage.get_obj()) + + def add_cpu_usage(self, cpu_usage): + self.cpu_usage_array.append(cpu_usage.get_obj()) + + def add_v_nic_performance(self, nic_performance): + self.v_nic_performance_array.append(nic_performance.get_obj()) + + def add_disk_usage(self, disk_usage): + self.disk_usage_array.append(disk_usage.get_obj()) def get_obj(self): """Get the object of the datatype""" obj = {} obj['additionalMeasurements'] = self.additional_measurements - obj['aggregateCpuUsage'] = self.aggregate_cpu_usage obj['codecUsageArray'] = self.codec_usage_array obj['concurrentSessions'] = self.concurrent_sessions obj['configuredEntities'] = self.configured_entities obj['cpuUsageArray'] = self.cpu_usage_array - obj['errors'] = self.errors obj['featureUsageArray'] = self.feature_usage_array obj['filesystemUsageArray'] = self.filesystem_usage_array obj['latencyDistribution'] = self.latency_distribution obj['meanRequestLatency'] = self.mean_request_latency - obj['measurementFieldsVersion'] = self.measurement_fields_version obj['measurementInterval'] = self.measurement_interval - obj['memoryConfigured'] = self.memory_configured - obj['memoryUsed'] = self.memory_used obj['numberOfMediaPortsInUse'] = self.number_of_media_ports_in_use obj['requestRate'] = self.request_rate obj['vnfcScalingMetric'] = self.vnfc_scaling_metric - obj['vNicUsageArray'] = self.v_nic_usage_array + obj['additionalFields'] = self.additional_fields + obj['additionalObjects'] = self.additional_objects + obj['diskUsageArray'] = self.disk_usage_array + obj['measurementsForVfScalingVersion'] = self.measurements_for_vf_scaling_version + obj['memoryUsageArray'] = self.memory_usage_array + obj['vNicPerformanceArray'] = self.v_nic_performance_array return obj def get_name(self): @@ -209,6 +496,7 @@ class Fault(Event): self.vf_status = 'Active' self.alarm_interface_a = '' self.alarm_additional_information = [] + self.event_category = "" def get_name(self): """Name of datatype""" @@ -225,6 +513,7 @@ class Fault(Event): obj['vfStatus'] = self.vf_status obj['alarmInterfaceA'] = self.alarm_interface_a obj['alarmAdditionalInformation'] = self.alarm_additional_information + obj['eventCategory'] = self.event_category return obj class VESPlugin(object): @@ -234,7 +523,6 @@ class VESPlugin(object): """Plugin initialization""" self.__plugin_data_cache = { 'cpu' : {'interval' : 0.0, 'vls' : []}, - 'cpu-aggregation' : {'interval' : 0.0, 'vls' : []}, 'virt' : {'interval' : 0.0, 'vls' : []}, 'disk' : {'interval' : 0.0, 'vls' : []}, 'interface' : {'interval' : 0.0, 'vls' : []}, @@ -250,8 +538,7 @@ class VESPlugin(object): 'UseHttps' : False, 'SendEventInterval' : 20.0, 'FunctionalRole' : 'Collectd VES Agent', - 'GuestRunning' : False, - 'ApiVersion' : 5.1 + 'ApiVersion' : 5.1 } self.__host_name = None self.__ves_timer = None @@ -311,172 +598,176 @@ class VESPlugin(object): except: collectd.error('Vendor Event Listener unknown error') - def bytes_to_gb(self, bytes): - """Convert bytes to GB""" - return round((bytes / 1073741824.0), 3) + def bytes_to_kb(self, bytes): + """Convert bytes to kibibytes""" + return round((bytes / 1024.0), 3) def get_hostname(self): if len(self.__host_name): return self.__host_name return socket.gethostname() + def send_host_measurements(self): + # get list of all VMs + virt_vcpu_total = self.cache_get_value(plugin_name='virt', type_name='virt_cpu_total', + mark_as_read=False) + vm_names = [x['plugin_instance'] for x in virt_vcpu_total] + for vm_name in vm_names: + # make sure that 'virt' plugin cache is up-to-date + vm_values = self.cache_get_value(plugin_name='virt', plugin_instance=vm_name, + mark_as_read=False) + us_up_to_date = True + for vm_value in vm_values: + if vm_value['updated'] == False: + us_up_to_date = False + break + if not us_up_to_date: + # one of the cache value is not up-to-date, break + collectd.warning("virt collectD cache values are not up-to-date for {}".format(vm_name)) + continue + # values are up-to-date, create an event message + measurement = MeasurementsForVfScaling(self.get_event_id()) + measurement.functional_role = self.__plugin_config['FunctionalRole'] + # fill out reporting_entity + measurement.reporting_entity_id = self.get_hostname() + measurement.reporting_entity_name = measurement.reporting_entity_id + # set source as a host value + measurement.source_id = vm_name + measurement.source_name = measurement.source_id + # fill out EpochMicrosec (convert to us) + measurement.start_epoch_microsec = (virt_vcpu_total[0]['time'] * 1000000) + # plugin interval + measurement.measurement_interval = self.__plugin_data_cache['virt']['interval'] + # memoryUsage + mem_usage = MemoryUsage(vm_name) + memory_total = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', + type_name='memory', type_instance='total') + memory_unused = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', + type_name='memory', type_instance='unused') + memory_rss = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', + type_name='memory', type_instance='rss') + if len(memory_total) > 0: + mem_usage.memory_configured = self.bytes_to_kb(memory_total[0]['values'][0]) + if len(memory_unused) > 0: + mem_usage.memory_free = self.bytes_to_kb(memory_unused[0]['values'][0]) + elif len(memory_rss) > 0: + mem_usage.memory_free = self.bytes_to_kb(memory_rss[0]['values'][0]) + # since, "used" metric is not provided by virt plugn, set the rest of the memory stats + # to zero to calculate used based on provided stats only + mem_usage.memory_buffered = mem_usage.memory_cached = mem_usage.memory_slab_recl = \ + mem_usage.memory_slab_unrecl = 0 + measurement.add_memory_usage(mem_usage) + # cpuUsage + virt_vcpus = self.cache_get_value(plugin_instance=vm_name, + plugin_name='virt', type_name='virt_vcpu') + for virt_vcpu in virt_vcpus: + cpu_usage = CpuUsage(virt_vcpu['type_instance']) + cpu_usage.percent_usage = self.cpu_ns_to_percentage(virt_vcpu) + measurement.add_cpu_usage(cpu_usage) + # vNicPerformance + if_packets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', + type_name='if_packets', mark_as_read=False) + if_names = [x['type_instance'] for x in if_packets] + for if_name in if_names: + if_packets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', + type_name='if_packets', type_instance=if_name) + if_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', + type_name='if_octets', type_instance=if_name) + if_errors = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', + type_name='if_errors', type_instance=if_name) + if_dropped = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', + type_name='if_dropped', type_instance=if_name) + v_nic_performance = VNicPerformance(if_name) + v_nic_performance.received_total_packets_accumulated = if_packets[0]['values'][0] + v_nic_performance.transmitted_total_packets_accumulated = if_packets[0]['values'][1] + v_nic_performance.received_octets_accumulated = if_octets[0]['values'][0] + v_nic_performance.transmitted_octets_accumulated = if_octets[0]['values'][1] + v_nic_performance.received_error_packets_accumulated = if_errors[0]['values'][0] + v_nic_performance.transmitted_error_packets_accumulated = if_errors[0]['values'][1] + v_nic_performance.received_discarded_packets_accumulated = if_dropped[0]['values'][0] + v_nic_performance.transmitted_discarded_packets_accumulated = if_dropped[0]['values'][1] + measurement.add_v_nic_performance(v_nic_performance) + # diskUsage + disk_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', + type_name='disk_octets', mark_as_read=False) + disk_names = [x['type_instance'] for x in disk_octets] + for disk_name in disk_names: + disk_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', + type_name='disk_octets', type_instance=disk_name) + disk_ops = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', + type_name='disk_ops', type_instance=disk_name) + disk_usage = DiskUsage(disk_name) + disk_usage.disk_octets_read_last = disk_octets[0]['values'][0] + disk_usage.disk_octets_write_last = disk_octets[0]['values'][1] + disk_usage.disk_ops_read_last = disk_ops[0]['values'][0] + disk_usage.disk_ops_write_last = disk_ops[0]['values'][1] + measurement.add_disk_usage(disk_usage) + # add additional measurements (perf) + perf_values = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', + type_name='perf') + named_array = NamedArrayOfFields('perf') + for perf in perf_values: + named_array.add(Field(perf['type_instance'], str(perf['values'][0]))) + measurement.add_additional_measurement(named_array) + # add host values as additional measurements + self.set_additional_fields(measurement, exclude_plugins=['virt']) + # send event to the VES + self.event_send(measurement) + if len(vm_names) > 0: + # mark the additional measurements metrics as read + self.mark_cache_values_as_read(exclude_plugins=['virt']) + def event_timer(self): """Event timer thread""" self.lock() try: - if (self.__plugin_config['GuestRunning']): - # if we running on a guest only, send 'additionalMeasurements' only - measurement = MeasurementsForVfScaling(self.get_event_id()) - measurement.functional_role = self.__plugin_config['FunctionalRole'] - # add host/guest values as additional measurements - self.fill_additional_measurements(measurement, exclude_plugins=[ - 'cpu', 'cpu-aggregation', 'memory', 'disk', 'interface', 'virt']) - # fill out reporting & source entities - reporting_entity = self.get_hostname() - measurement.reporting_entity_id = reporting_entity - measurement.reporting_entity_name = reporting_entity - measurement.source_id = reporting_entity - measurement.source_name = measurement.source_id - measurement.start_epoch_microsec = (time.time() * 1000000) - measurement.measurement_interval = self.__plugin_config['SendEventInterval'] - # total CPU - total_cpu_system = self.cache_get_value(plugin_name='cpu-aggregation', type_instance='system') - total_cpu_user = self.cache_get_value(plugin_name='cpu-aggregation', type_instance='user') - measurement.aggregate_cpu_usage = round(total_cpu_system[0]['values'][0] + - total_cpu_user[0]['values'][0], 2) - # CPU per each instance - cpux_system = self.cache_get_value(plugin_name='cpu', type_instance='system', - mark_as_read = False) - for cpu_inst in [x['plugin_instance'] for x in cpux_system]: - cpu_system = self.cache_get_value(plugin_name='cpu', - plugin_instance=cpu_inst, type_instance='system') - cpu_user = self.cache_get_value(plugin_name='cpu', - plugin_instance=cpu_inst, type_instance='user') - cpu_usage = round(cpu_system[0]['values'][0] + cpu_user[0]['values'][0], 2) - measurement.add_cpu_usage(cpu_inst, cpu_usage) - # fill memory used - memory_used = self.cache_get_value(plugin_name='memory', type_name='memory', type_instance='used') - if len(memory_used) > 0: - measurement.memory_used = self.bytes_to_gb(memory_used[0]['values'][0]) - # if_packets - ifinfo = {} - if_stats = self.cache_get_value(plugin_name='interface', type_name='if_packets') - if len(if_stats) > 0: - for if_stat in if_stats: - ifinfo[if_stat['plugin_instance']] = { - 'pkts' : (if_stat['values'][0], if_stat['values'][1]) - } - # go through all interfaces and get if_octets - for if_name in ifinfo.keys(): - if_stats = self.cache_get_value(plugin_instance=if_name, plugin_name='interface', - type_name='if_octets') - if len(if_stats) > 0: - ifinfo[if_name]['bytes'] = (if_stats[0]['values'][0], if_stats[0]['values'][1]) - # fill vNicUsageArray filed in the event - for if_name in ifinfo.keys(): - measurement.add_v_nic_usage(if_name, ifinfo[if_name]['pkts'], ifinfo[if_name]['bytes']) - # send event to the VES - self.event_send(measurement) - return - # get list of all VMs - virt_vcpu_total = self.cache_get_value(plugin_name='virt', type_name='virt_cpu_total', - mark_as_read=False) - vm_names = [x['plugin_instance'] for x in virt_vcpu_total] - for vm_name in vm_names: - # make sure that 'virt' plugin cache is up-to-date - vm_values = self.cache_get_value(plugin_name='virt', plugin_instance=vm_name, - mark_as_read=False) - us_up_to_date = True - for vm_value in vm_values: - if vm_value['updated'] == False: - us_up_to_date = False - break - if not us_up_to_date: - # one of the cache value is not up-to-date, break - collectd.warning("virt collectD cache values are not up-to-date for {}".format(vm_name)) - continue - # if values are up-to-date, create an event message - measurement = MeasurementsForVfScaling(self.get_event_id()) - measurement.functional_role = self.__plugin_config['FunctionalRole'] - # fill out reporting_entity - reporting_entity = '{}-{}-{}'.format(self.get_hostname(), 'virt', vm_name) - measurement.reporting_entity_id = reporting_entity - measurement.reporting_entity_name = reporting_entity - # virt_vcpu - virt_vcpus = self.cache_get_value(plugin_instance=vm_name, - plugin_name='virt', type_name='virt_vcpu') - if len(virt_vcpus) > 0: - for virt_vcpu in virt_vcpus: - cpu_usage = self.cpu_ns_to_percentage(virt_vcpu) - measurement.add_cpu_usage(virt_vcpu['type_instance'], cpu_usage) - # virt_cpu_total - virt_vcpu_total = self.cache_get_value(plugin_instance=vm_name, - plugin_name='virt', type_name='virt_cpu_total') - if len(virt_vcpu_total) > 0: - average_cpu_usage = self.cpu_ns_to_percentage(virt_vcpu_total[0]) / len(virt_vcpus) - measurement.aggregate_cpu_usage = average_cpu_usage - # set source as a host for virt_vcpu_total value - measurement.source_id = virt_vcpu_total[0]['host'] - measurement.source_name = measurement.source_id - # fill out EpochMicrosec (convert to us) - measurement.start_epoch_microsec = (virt_vcpu_total[0]['time'] * 1000000) - # plugin interval - measurement.measurement_interval = self.__plugin_data_cache['virt']['interval'] - # memory-total - memory_total = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='memory', type_instance='total') - if len(memory_total) > 0: - measurement.memory_configured = self.bytes_to_gb(memory_total[0]['values'][0]) - # memory-rss - memory_rss = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='memory', type_instance='rss') - if len(memory_rss) > 0: - measurement.memory_used = self.bytes_to_gb(memory_rss[0]['values'][0]) - # if_packets - ifinfo = {} - if_stats = self.cache_get_value(plugin_instance=vm_name, - plugin_name='virt', type_name='if_packets') - if len(if_stats) > 0: - for if_stat in if_stats: - ifinfo[if_stat['type_instance']] = { - 'pkts' : (if_stat['values'][0], if_stat['values'][1]) - } - # go through all interfaces and get if_octets - for if_name in ifinfo.keys(): - if_stats = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='if_octets', type_instance=if_name) - if len(if_stats) > 0: - ifinfo[if_name]['bytes'] = (if_stats[0]['values'][0], if_stats[0]['values'][1]) - # fill vNicUsageArray filed in the event - for if_name in ifinfo.keys(): - measurement.add_v_nic_usage(if_name, ifinfo[if_name]['pkts'], ifinfo[if_name]['bytes']) - # add host/guest values as additional measurements - self.fill_additional_measurements(measurement, ['virt']) - # send event to the VES - self.event_send(measurement) + self.send_host_measurements() finally: self.unlock() - def fill_additional_measurements(self, measurement, exclude_plugins=None): - """Fill out addition measurement filed with host/guets values""" + def mark_cache_values_as_read(self, exclude_plugins=None): + """mark the cache values as read""" + for plugin_name in self.__plugin_data_cache.keys(): + if (exclude_plugins != None and plugin_name in exclude_plugins): + # skip excluded plugins + continue; + for val in self.__plugin_data_cache[plugin_name]['vls']: + val['updated'] = False + + def set_additional_measurements(self, measurement, exclude_plugins=None): + """Set addition measurement filed with host/guets values""" # add host/guest values as additional measurements for plugin_name in self.__plugin_data_cache.keys(): if (exclude_plugins != None and plugin_name in exclude_plugins): - # skip host-only values + # skip excluded plugins continue; for val in self.__plugin_data_cache[plugin_name]['vls']: if val['updated']: - mgroup_name = '{}{}{}'.format(plugin_name, - '-{}'.format(val['plugin_instance']) if len(val['plugin_instance']) else '', - '-{}'.format(val['type_instance']) if len(val['type_instance']) else '') - mgroup = MeasurementGroup(mgroup_name) + array_name = self.make_dash_string(plugin_name, val['plugin_instance'], + val['type_instance']) + named_array = NamedArrayOfFields(array_name) ds = collectd.get_dataset(val['type']) for index in range(len(ds)): mname = '{}-{}'.format(val['type'], ds[index][0]) - mgroup.add_measurement(mname, str(val['values'][index])) - measurement.add_measurement_group(mgroup); + named_array.add(Field(mname, str(val['values'][index]))) + measurement.add_additional_measurement(named_array); val['updated'] = False + def set_additional_fields(self, measurement, exclude_plugins=None): + # set host values as additional fields + for plugin_name in self.__plugin_data_cache.keys(): + if (exclude_plugins != None and plugin_name in exclude_plugins): + # skip excluded plugins + continue; + for val in self.__plugin_data_cache[plugin_name]['vls']: + if val['updated']: + name_prefix = self.make_dash_string(plugin_name, val['plugin_instance'], + val['type_instance']) + ds = collectd.get_dataset(val['type']) + for index in range(len(ds)): + field_name = self.make_dash_string(name_prefix, val['type'], ds[index][0]) + measurement.add_additional_fields(Field(field_name, str(val['values'][index]))) + def cpu_ns_to_percentage(self, vl): """Convert CPU usage ns to CPU %""" total = vl['values'][0] @@ -607,15 +898,21 @@ class VESPlugin(object): fault.functional_role = self.__plugin_config['FunctionalRole'] fault.reporting_entity_id = self.get_hostname() fault.reporting_entity_name = self.get_hostname() - fault.source_id = self.get_hostname() - fault.source_name = self.get_hostname() + if n.plugin == 'virt': + # if the notification is generated by virt plugin, + # use the plugin_instance (e.g. VM name) as a source. + fault.source_id = str(n.plugin_instance) + fault.source_name = fault.source_id + else: + fault.source_id = self.get_hostname() + fault.source_name = self.get_hostname() fault.start_epoch_microsec = (n.time * 1000000) fault.last_epoch_micro_sec = fault.start_epoch_microsec # fill out fault header fault.event_severity = collectd_event_severity_map[n.severity] fault.specific_problem = self.make_dash_string(n.plugin_instance, n.type_instance) fault.alarm_interface_a = self.make_dash_string(n.plugin, n.plugin_instance) - fault.event_source_type = 'virtualMachine(8)' if self.__plugin_config['GuestRunning'] else 'host(3)' + fault.event_source_type = 'host(3)' fault.alarm_condition = n.message self.event_send(fault) diff --git a/3rd_party/ovs_pmd_stats/ovs_pmd_stats.py b/3rd_party/ovs_pmd_stats/ovs_pmd_stats.py new file mode 100755 index 00000000..fc6045b9 --- /dev/null +++ b/3rd_party/ovs_pmd_stats/ovs_pmd_stats.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +# +# Copyright(c) 2017 Intel Corporation. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Authors: +# Roman Korynkevych <romanx.korynkevych@intel.com> + +import socket +import argparse +import json +import logging + +HOSTNAME = socket.gethostname() +PROG_NAME = 'ovs_pmd_stats' +TYPE = 'counter' + +MAIN_THREAD = 'main thread' +PMD_THREAD = 'pmd thread' + +REQUEST_MESSAGE = '{"id":0,"method":"dpif-netdev/pmd-stats-show","params":[]}' +RESPONSE_MESSAGE_TIMEOUT = 1.0 + +# Setup arguments +parser = argparse.ArgumentParser(prog=PROG_NAME) +parser.add_argument('--socket-pid-file', required=True, help='ovs-vswitchd.pid file location') +args = parser.parse_args() + +try: + fp = open(args.socket_pid_file, 'r') + pid = fp.readline() + fp.close() +except IOError as e: + logging.error('I/O error({}): {}'.format(e.errno, e.strerror)) + raise SystemExit() +except: + logging.error('Unexpected error:', sys.exc_info()[0]) + raise SystemExit() + +server_address = args.socket_pid_file.replace('.pid', '.{}.ctl'.format(pid.strip())) + +# open unix socket to ovs-vswitch +sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) +try: + sock.connect(server_address) +except socket.error as msg: + logging.error('Socket address: {} Error: {}'.format(server_address, msg)) + raise SystemExit() + +# set timeout +sock.settimeout(RESPONSE_MESSAGE_TIMEOUT) + +# send request +sock.sendall(REQUEST_MESSAGE) + +# listen for respnse message +rdata = '' +while True: + try: + rdata += sock.recv(4096) + + if rdata.count('{') == rdata.count('}'): + break + except socket.timeout: + logging.error('Response message has not been received in {} sec.'.format(RESPONSE_MESSAGE_TIMEOUT)) + raise SystemExit() + except socket.error as e: + logging.error('Error received while reading: {}'.format(e.strerror)) + raise SystemExit() + +# parse the message +try: + s = json.loads(rdata, strict=False) +except ValueError as e: + logging.error('Failed to parse JSON response: {}'.format(e.strerror)) + raise SystemExit() + +# check for key string presence in the string +if 'result' not in s or 'id' not in s or 'error' not in s: + logging.error("One of the keys: ['id'], ['result'], ['error'] is missed in the response") + logging.error('Msg: {}'.format(s)) + raise SystemExit() + +array = s['result'].replace('\t', '').splitlines() + +# submit metrics in collectd format +plugin_instance = '' +for el in array: + if MAIN_THREAD in el or PMD_THREAD in el: + plugin_instance = el[:-1].replace(' ', '_') + else: + type_instance = el.split(':')[0].replace(' ', "_") + value = el.split(':')[1].split(' ')[0] + print('PUTVAL %s/%s-%s/%s-%s N:%s' % (HOSTNAME, PROG_NAME, plugin_instance, TYPE, type_instance, value)) + +# close socket +sock.close() + diff --git a/baro_tests/barometer.py b/baro_tests/barometer.py new file mode 100644 index 00000000..e210f333 --- /dev/null +++ b/baro_tests/barometer.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 + +import logging + +from baro_tests import collectd + +import functest.core.feature as base + + +class BarometerCollectd(base.Feature): + ''' + Class for executing barometercollectd testcase. + ''' + + __logger = logging.getLogger(__name__) + + def execute(self): + return collectd.main(self.__logger) diff --git a/ci/utility/collectd_build_rpm.sh b/ci/utility/collectd_build_rpm.sh index 2190d074..086aa979 100755 --- a/ci/utility/collectd_build_rpm.sh +++ b/ci/utility/collectd_build_rpm.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2017 Intel Corporation +# Copyright 2017 Intel Corporation and OPNFV # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ source $DIR/package-list.sh VERSION="VERSION_NOT_SET" +rm -rf $RPM_WORKDIR cd $COLLECTD_DIR VERSION=$( $COLLECTD_DIR/version-gen.sh | sed "s/\W$//g" ) $COLLECTD_DIR/build.sh @@ -36,5 +37,4 @@ sed --regexp-extended \ --expression="s/without_intel_rdt:[0-9]/without_intel_rdt:1/g" \ $COLLECTD_DIR/contrib/redhat/collectd.spec -rpmbuild --define "_topdir $RPM_WORKDIR" -bb $COLLECTD_DIR/contrib/redhat/collectd.spec -gsutil -m cp -r $RPM_WORKDIR/RPMS/* gs://artifacts.opnfv.org/barometer/rpms +rpmbuild --define "_topdir $RPM_WORKDIR" -bb $COLLECTD_DIR/contrib/redhat/collectd.spec
\ No newline at end of file |