diff options
Diffstat (limited to '3rd_party/collectd-ves-app/ves_app')
-rw-r--r-- | 3rd_party/collectd-ves-app/ves_app/__init__.py | 0 | ||||
-rw-r--r-- | 3rd_party/collectd-ves-app/ves_app/host.yaml | 214 | ||||
-rw-r--r-- | 3rd_party/collectd-ves-app/ves_app/normalizer.py | 598 | ||||
-rw-r--r-- | 3rd_party/collectd-ves-app/ves_app/ves_app.py | 214 | ||||
-rw-r--r-- | 3rd_party/collectd-ves-app/ves_app/ves_app_config.conf | 12 |
5 files changed, 1038 insertions, 0 deletions
diff --git a/3rd_party/collectd-ves-app/ves_app/__init__.py b/3rd_party/collectd-ves-app/ves_app/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/3rd_party/collectd-ves-app/ves_app/__init__.py diff --git a/3rd_party/collectd-ves-app/ves_app/host.yaml b/3rd_party/collectd-ves-app/ves_app/host.yaml new file mode 100644 index 00000000..a91574cf --- /dev/null +++ b/3rd_party/collectd-ves-app/ves_app/host.yaml @@ -0,0 +1,214 @@ +--- +# Common event header definition (required fields and defaults) +commonEventHeader: &commonEventHeader + domain: N/A + eventId: "{system.id}" + eventName: "" + eventType: Info + lastEpochMicrosec: 0 + priority: Normal + reportingEntityId: &reportingEntityId "{system.hostname}" + reportingEntityName: *reportingEntityId + sequence: 0 + sourceName: N/A + startEpochMicrosec: 0 + version: 2.0 + +# Value mapping (used to map collectd notification severity to VES) +collectdSeverityMapping: &collectdSeverityMapping + NOTIF_FAILURE: CRITICAL + NOTIF_WARNING: WARNING + NOTIF_OKAY: NORMAL + +# Measurements definition +Host Measurements: !Measurements + - ITEM-DESC: + event: + commonEventHeader: + <<: *commonEventHeader + eventType: hostOS + domain: measurementsForVfScaling + sourceId: &sourceId "{vl.plugin_instance}" + sourceName: *sourceId + startEpochMicrosec: !Number "{vl.time}" + measurementsForVfScalingFields: + measurementsForVfScalingVersion: 2.0 + additionalFields: !ArrayItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: "/^(?!memory|virt_vcpu|disk_octets|disk_ops|if_packets|if_errors|if_octets|if_dropped).*$/" + - ITEM-DESC: + name: "{vl.type}-{vl.type_instance}-{vl.ds_name}" + value: "{vl.value}" + additionalMeasurements: !ArrayItem + - SELECT: + plugin: "/^(?!virt).*$/" + - INDEX-KEY: + - plugin + - plugin_instance + - ITEM-DESC: + name: "{vl.plugin}-{vl.plugin_instance}" + arrayOfFields: !ArrayItem + - SELECT: + plugin: "{vl.plugin}" + plugin_instance: "{vl.plugin_instance}" + - ITEM-DESC: + name: "{vl.type}-{vl.type_instance}-{vl.ds_name}" + value: "{vl.value}" + measurementInterval: !Number "{vl.interval}" + memoryUsageArray: !ArrayItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: memory + type_instance: total + - ITEM-DESC: + memoryConfigured: !Bytes2Kibibytes "{vl.value}" + vmIdentifier: "{vl.plugin_instance}" + memoryUsed: 0.0 + memoryFree: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: memory + type_instance: rss + - VALUE: !Bytes2Kibibytes "{vl.value}" + - DEFAULT: 0 + cpuUsageArray: !ArrayItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: virt_vcpu + - ITEM-DESC: + cpuIdentifier: "{vl.type_instance}" + percentUsage: !Number "{vl.value}" + vNicPerformanceArray: !ArrayItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: if_packets + ds_name: rx + - ITEM-DESC: + valuesAreSuspect: "true" + vNicIdentifier: "{vl.type_instance}" + receivedTotalPacketsAccumulated: !Number "{vl.value}" + transmittedTotalPacketsAccumulated: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: if_packets + type_instance: "{vl.type_instance}" + ds_name: tx + receivedOctetsAccumulated: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: if_octets + type_instance: "{vl.type_instance}" + ds_name: rx + transmittedOctetsAccumulated: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: if_octets + type_instance: "{vl.type_instance}" + ds_name: tx + receivedErrorPacketsAccumulated: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: if_errors + type_instance: "{vl.type_instance}" + ds_name: rx + transmittedErrorPacketsAccumulated: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: if_errors + type_instance: "{vl.type_instance}" + ds_name: tx + receivedDiscardedPacketsAccumulated: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: if_dropped + type_instance: "{vl.type_instance}" + ds_name: rx + transmittedDiscardedPacketsAccumulated: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: if_dropped + type_instance: "{vl.type_instance}" + ds_name: tx + diskUsageArray: !ArrayItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: disk_octets + ds_name: read + - ITEM-DESC: + diskIdentifier: "{vl.type_instance}" + diskOctetsReadLast: !Number "{vl.value}" + diskOctetsWriteLast: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: disk_octets + type_instance: "{vl.type_instance}" + ds_name: write + diskOpsReadLast: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: disk_ops + type_instance: "{vl.type_instance}" + ds_name: read + diskOpsWriteLast: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: disk_ops + type_instance: "{vl.type_instance}" + ds_name: write + - SELECT: + plugin: virt + type_instance: virt_cpu_total + +Virt Events: !Events + - ITEM-DESC: + event: + commonEventHeader: &event_commonEventHeader + <<: *commonEventHeader + domain: fault + eventType: Notification + sourceId: &event_sourceId "{n.plugin_instance}" + sourceName: *event_sourceId + lastEpochMicrosec: !Number "{n.time}" + startEpochMicrosec: !Number "{n.time}" + faultFields: &faultFields + alarmInterfaceA: "{n.plugin}-{n.plugin_instance}" + alarmCondition: "{n.message}" + eventSeverity: !MapValue + VALUE: "{n.severity}" + TO: *collectdSeverityMapping + eventSourceType: hypervisor + faultFieldsVersion: 1.1 + specificProblem: "{n.plugin_instance}-{n.type_instance}" + vfStatus: Active + - CONDITION: + plugin: virt + +Host Events: !Events + - ITEM-DESC: + event: + commonEventHeader: + <<: *event_commonEventHeader + sourceId: "{system.hostname}" + sourceName: "{system.hostname}" + faultFields: + <<: *faultFields + eventSourceType: host + - CONDITION: + plugin: "/^(?!virt).*$/" diff --git a/3rd_party/collectd-ves-app/ves_app/normalizer.py b/3rd_party/collectd-ves-app/ves_app/normalizer.py new file mode 100644 index 00000000..899c850b --- /dev/null +++ b/3rd_party/collectd-ves-app/ves_app/normalizer.py @@ -0,0 +1,598 @@ +# +# Copyright(c) 2017 Intel Corporation. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Authors: +# Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com> +# + +import yaml +import logging +import datetime +import time +from threading import RLock +from threading import Timer +from threading import Thread +import re + +# import YAML loader +try: + from yaml import CLoader as Loader +except ImportError: + from yaml import Loader + +# import synchronized queue +try: + # python 2.x + import Queue as queue +except ImportError: + # python 3.x + import queue + + +class Config(object): + """Configuration class used to pass config option into YAML file""" + + def __init__(self, interval): + self.interval = interval + + +class System(object): + """System class which provides information like host, time etc., into YAML + file""" + + def __init__(self): + self.hostname = 'localhost' + self._id = 0 + + @property + def id(self): + self._id = self._id + 1 + return self._id + + @property + def time(self): + return time.time() + + @property + def date(self): + return datetime.date.today().isoformat() + + +class ItemIterator(object): + """Item iterator returned by Collector class""" + + def __init__(self, collector, items): + """Item iterator init""" + logging.debug('{}:__init__()'.format(self.__class__.__name__)) + self._items = items + self._collector = collector + self._index = 0 + + def next(self): + """Returns next item from the list""" + if self._index == len(self._items): + raise StopIteration + curr_index = self._index + self._index = curr_index + 1 + return self.items[curr_index] + + def __getitem__(self, key): + """get item by index""" + return self._items[key] + + def __len__(self): + """Return length of elements""" + return len(self._items) + + def __del__(self): + """Destroy iterator and unlock the collector""" + logging.debug('{}:__del__()'.format(self.__class__.__name__)) + self._collector.unlock() + + +class ItemObject(object): + """Item object returned by Collector class""" + + def __init__(self, collector, hash_): + """Item object init""" + logging.debug('{}:__init__()'.format(self.__class__.__name__)) + super(ItemObject, self).__setattr__('_collector', collector) + super(ItemObject, self).__setattr__('_hash', hash_) + + def __setattr__(self, name, value): + t, item = self._collector._metrics[self._hash] + logging.debug('{}:__setattr__(name={}, value={})'.format( + self.__class__.__name__, name, value)) + setattr(item, name, value) + self._collector._metrics[self._hash] = (time.time(), item) + + def __del__(self): + """Destroy item object and unlock the collector""" + logging.debug('{}:__del__()'.format(self.__class__.__name__)) + self._collector.unlock() + + +class Collector(object): + """Thread-safe collector with aging feature""" + + def __init__(self, age_timeout): + """Initialization""" + self._metrics = {} + self._lock = RLock() + self._age_timeout = age_timeout + self._start_age_timer() + + def _start_age_timer(self): + """Start age timer""" + self._age_timer = Timer(self._age_timeout, self._on_timer) + self._age_timer.start() + + def _stop_age_timer(self): + """Stop age timer""" + self._age_timer.cancel() + + def _on_timer(self): + """Age timer""" + self._start_age_timer() + self._check_aging() + + def _check_aging(self): + """Check aging time for all items""" + self.lock() + for data_hash, data in self._metrics.items(): + age, item = data + if ((time.time() - age) >= self._age_timeout): + # aging time has expired, remove the item from the collector + logging.debug('{}:_check_aging():value={}'.format( + self.__class__.__name__, item)) + self._metrics.pop(data_hash) + del(item) + self.unlock() + + def lock(self): + """Lock the collector""" + logging.debug('{}:lock()'.format(self.__class__.__name__)) + self._lock.acquire() + + def unlock(self): + """Unlock the collector""" + logging.debug('{}:unlock()'.format(self.__class__.__name__)) + self._lock.release() + + def get(self, hash_): + self.lock() + if hash_ in self._metrics: + return ItemObject(self, hash_) + self.unlock() + return None + + def add(self, item): + """Add an item into the collector""" + self.lock() + logging.debug('{}:add(item={})'.format(self.__class__.__name__, item)) + self._metrics[hash(item)] = (time.time(), item) + self.unlock() + + def items(self, select_list=[]): + """Returns locked (safe) item iterator""" + metrics = [] + self.lock() + for k, item in self._metrics.items(): + _, value = item + for select in select_list: + if value.match(**select): + metrics.append(value) + return ItemIterator(self, metrics) + + def destroy(self): + """Destroy the collector""" + self._stop_age_timer() + + +class CollectdData(object): + """Base class for Collectd data""" + + def __init__(self, host=None, plugin=None, plugin_instance=None, + type_=None, type_instance=None, time_=None): + """Class initialization""" + self.host = host + self.plugin = plugin + self.plugin_instance = plugin_instance + self.type_instance = type_instance + self.type = type_ + self.time = time_ + + @classmethod + def is_regular_expression(cls, expr): + return True if expr[0] == '/' and expr[-1] == '/' else False + + def match(self, **kargs): + # compare the metric + for key, value in kargs.items(): + if self.is_regular_expression(value): + if re.match(value[1:-1], getattr(self, key)) is None: + return False + elif value != getattr(self, key): + return False + # return match event if kargs is empty + return True + + +class CollectdNotification(CollectdData): + """Collectd notification""" + + def __init__(self, host=None, plugin=None, plugin_instance=None, + type_=None, type_instance=None, severity=None, message=None): + super(CollectdNotification, self).__init__( + host, plugin, plugin_instance, type_, type_instance) + self.severity = severity + self.message = message + + def __repr__(self): + return '{}(host={}, plugin={}, plugin_instance={}, type={},' \ + 'type_instance={}, severity={}, message={}, time={})'.format( + self.__class__.__name__, self.host, self.plugin, + self.plugin_instance, self.type, self.type_instance, + self.severity, self.message, time) + + +class CollectdValue(CollectdData): + """Collectd value""" + + def __init__(self, host=None, plugin=None, plugin_instance=None, + type_=None, type_instance=None, ds_name='value', value=None, + interval=None): + super(CollectdValue, self).__init__( + host, plugin, plugin_instance, type_, type_instance) + self.value = value + self.ds_name = ds_name + self.interval = interval + + @classmethod + def hash_gen(cls, host, plugin, plugin_instance, type_, + type_instance, ds_name): + return hash((host, plugin, plugin_instance, type_, + type_instance, ds_name)) + + def __eq__(self, other): + return hash(self) == hash(other) and self.value == other.value + + def __hash__(self): + return self.hash_gen(self.host, self.plugin, self.plugin_instance, + self.type, self.type_instance, self.ds_name) + + def __repr__(self): + return '{}(host={}, plugin={}, plugin_instance={}, type={},' \ + 'type_instance={}, ds_name={}, value={}, time={})'.format( + self.__class__.__name__, self.host, self.plugin, + self.plugin_instance, self.type, self.type_instance, + self.ds_name, self.value, self.time) + + +class Item(yaml.YAMLObject): + """Base class to process tags like ArrayItem/ValueItem""" + + @classmethod + def format_node(cls, mapping, metric): + if mapping.tag in [ + 'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag, + Number.yaml_tag]: + return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric)) + elif mapping.tag == 'tag:yaml.org,2002:map': + values = [] + for key, value in mapping.value: + values.append((yaml.ScalarNode(key.tag, key.value), + cls.format_node(value, metric))) + return yaml.MappingNode(mapping.tag, values) + elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]: + values = [] + for seq in mapping.value: + map_values = list() + for key, value in seq.value: + if key.value == 'SELECT': + map_values.append((yaml.ScalarNode(key.tag, key.value), + cls.format_node(value, metric))) + else: + map_values.append((yaml.ScalarNode(key.tag, key.value), + value)) + values.append(yaml.MappingNode(seq.tag, map_values)) + return yaml.SequenceNode(mapping.tag, values) + elif mapping.tag in [MapValue.yaml_tag]: + values = [] + for key, value in mapping.value: + if key.value == 'VALUE': + values.append((yaml.ScalarNode(key.tag, key.value), + cls.format_node(value, metric))) + else: + values.append((yaml.ScalarNode(key.tag, key.value), value)) + return yaml.MappingNode(mapping.tag, values) + return mapping + + +class ValueItem(Item): + """Class to process VlaueItem tag""" + yaml_tag = u'!ValueItem' + + @classmethod + def from_yaml(cls, loader, node): + logging.debug('{}:from_yaml(loader={})'.format(cls.__name__, loader)) + default, select, value_desc = None, list(), None + # find value description + for elem in node.value: + for key, value in elem.value: + if key.value == 'VALUE': + assert value_desc is None, "VALUE key already set" + value_desc = value + if key.value == 'SELECT': + select.append(loader.construct_mapping(value)) + if key.value == 'DEFAULT': + assert default is None, "DEFAULT key already set" + default = loader.construct_object(value) + # if VALUE key isn't given, use default VALUE key + # format: `VALUE: !Number '{vl.value}'` + if value_desc is None: + value_desc = yaml.ScalarNode(tag=u'!Number', value=u'{vl.value}') + # select collectd metric based on SELECT condition + metrics = loader.collector.items(select) + assert len(metrics) < 2, \ + 'Wrong SELECT condition, selected {} metrics'.format(len(metrics)) + if len(metrics) > 0: + item = cls.format_node(value_desc, {'vl': metrics[0], + 'system': loader.system}) + return loader.construct_object(item) + # nothing has been found by SELECT condition, set to DEFAULT value. + assert default is not None, \ + "No metrics selected by SELECT condition and DEFAULT key isn't set" + return default + + +class ArrayItem(Item): + """Class to process ArrayItem tag""" + yaml_tag = u'!ArrayItem' + + @classmethod + def from_yaml(cls, loader, node): + logging.debug('{}:process(loader={}, node={})'.format(cls.__name__, + loader, node)) + # e.g.: + # SequenceNode(tag=u'!ArrayItem', value=[ + # MappingNode(tag=u'tag:yaml.org,2002:map', value=[ + # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'), + # MappingNode(tag=u'tag:yaml.org,2002:map', value=[ + # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'), + # , ...) + # ]), ... + # ), (key, value), ... ]) + # , ... ]) + assert isinstance(node, yaml.SequenceNode), \ + "{} tag isn't YAML array".format(cls.__name__) + select, index_keys, items, item_desc = list(), list(), list(), None + for elem in node.value: + for key, value in elem.value: + if key.value == 'ITEM-DESC': + assert item_desc is None, "ITEM-DESC key already set" + item_desc = value + if key.value == 'INDEX-KEY': + assert len(index_keys) == 0, "INDEX-KEY key already set" + index_keys = loader.construct_sequence(value) + if key.value == 'SELECT': + select.append(loader.construct_mapping(value)) + # validate item description + assert item_desc is not None, "Mandatory ITEM-DESC key isn't set" + assert len(select) > 0 or len(index_keys) > 0, \ + "Mandatory key (INDEX-KEY or SELECT) isn't set" + metrics = loader.collector.items(select) + # select metrics based on INDEX-KEY provided + if len(index_keys) > 0: + metric_set = set() + for metric in metrics: + value_params = {} + for key in index_keys: + value_params[key] = getattr(metric, key) + metric_set.add(CollectdValue(**value_params)) + metrics = list(metric_set) + # build items based on SELECT and/or INDEX-KEY criteria + for metric in metrics: + item = cls.format_node(item_desc, + {'vl': metric, 'system': loader.system, + 'config': loader.config}) + items.append(loader.construct_mapping(item)) + return items + + +class Measurements(ArrayItem): + """Class to process Measurements tag""" + yaml_tag = u'!Measurements' + + +class Events(Item): + """Class to process Events tag""" + yaml_tag = u'!Events' + + @classmethod + def from_yaml(cls, loader, node): + condition, item_desc = dict(), None + for elem in node.value: + for key, value in elem.value: + if key.value == 'ITEM-DESC': + item_desc = value + if key.value == 'CONDITION': + condition = loader.construct_mapping(value) + assert item_desc is not None, "Mandatory ITEM-DESC key isn't set" + if loader.notification.match(**condition): + item = cls.format_node(item_desc, { + 'n': loader.notification, 'system': loader.system}) + return loader.construct_mapping(item) + return None + + +class Bytes2Kibibytes(yaml.YAMLObject): + """Class to process Bytes2Kibibytes tag""" + yaml_tag = u'!Bytes2Kibibytes' + + @classmethod + def from_yaml(cls, loader, node): + return round(float(node.value) / 1024.0, 3) + + +class Number(yaml.YAMLObject): + """Class to process Number tag""" + yaml_tag = u'!Number' + + @classmethod + def from_yaml(cls, loader, node): + try: + return int(node.value) + except ValueError: + return float(node.value) + + +class MapValue(yaml.YAMLObject): + """Class to process MapValue tag""" + yaml_tag = u'!MapValue' + + @classmethod + def from_yaml(cls, loader, node): + mapping, val = None, None + for key, value in node.value: + if key.value == 'TO': + mapping = loader.construct_mapping(value) + if key.value == 'VALUE': + val = loader.construct_object(value) + assert mapping is not None, "Mandatory TO key isn't set" + assert val is not None, "Mandatory VALUE key isn't set" + assert val in mapping, \ + 'Value "{}" cannot be mapped to any of {} values'.format( + val, mapping.keys()) + return mapping[val] + + +class Normalizer(object): + """Normalization class which handles events and measurements""" + + def __init__(self): + """Init""" + self.interval = None + self.collector = None + self.system = None + self.queue = None + self.timer = None + + @classmethod + def read_configuration(cls, config_file): + """read YAML configuration file""" + # load YAML events/measurements definition + f = open(config_file, 'r') + doc_yaml = yaml.compose(f) + f.close() + # split events & measurements definitions + measurements, events = list(), list() + for key, value in doc_yaml.value: + if value.tag == Measurements.yaml_tag: + measurements.append((key, value)) + if value.tag == Events.yaml_tag: + events.append((key, value)) + measurements_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map', + measurements) + measurements_stream = yaml.serialize(measurements_yaml) + events_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map', events) + events_stream = yaml.serialize(events_yaml) + # return event & measurements definition + return events_stream, measurements_stream + + def initialize(self, config_file, interval): + """Initialize the class""" + e, m = self.read_configuration(config_file) + self.measurements_stream = m + self.events_stream = e + self.system = System() + self.config = Config(interval) + self.interval = interval + # start collector with aging time = double interval + self.collector = Collector(interval * 2) + # initialize event thread + self.queue = queue.Queue() + self.event_thread = Thread(target=self.event_worker) + self.event_thread.daemon = True + self.event_thread.start() + # initialize measurements timer + self.start_timer() + + def destroy(self): + """Destroy the class""" + self.collector.destroy() + self.post_event(None) # send stop event + self.event_thread.join() + self.stop_timer() + + def start_timer(self): + """Start measurements timer""" + self.timer = Timer(self.interval, self.on_timer) + self.timer.start() + + def stop_timer(self): + """Stop measurements timer""" + self.timer.cancel() + + def on_timer(self): + """Measurements timer""" + self.start_timer() + self.process_measurements() + + def event_worker(self): + """Event worker""" + while True: + event = self.queue.get() + if isinstance(event, CollectdNotification): + self.process_notify(event) + continue + # exit for the worker + break + + def get_collector(self): + """Get metric collector reference""" + return self.collector + + def process_measurements(self): + """Process measurements""" + loader = Loader(self.measurements_stream) + setattr(loader, 'collector', self.collector) + setattr(loader, 'system', self.system) + setattr(loader, 'config', self.config) + measurements = loader.get_data() + for measurement_name in measurements: + logging.debug('Process "{}" measurements: {}'.format( + measurement_name, measurements[measurement_name])) + for measurement in measurements[measurement_name]: + self.send_data(measurement) + + def process_notify(self, notification): + """Process events""" + loader = Loader(self.events_stream) + setattr(loader, 'notification', notification) + setattr(loader, 'system', self.system) + notifications = loader.get_data() + for notify_name in notifications: + logging.debug('Process "{}" notification'.format(notify_name)) + if notifications[notify_name] is not None: + self.send_data(notifications[notify_name]) + + def send_data(self, data): + """Send data""" + assert False, 'send_data() is abstract function and MUST be overridden' + + def post_event(self, notification): + """Post notification into the queue to process""" + self.queue.put(notification) diff --git a/3rd_party/collectd-ves-app/ves_app/ves_app.py b/3rd_party/collectd-ves-app/ves_app/ves_app.py new file mode 100644 index 00000000..105c66e2 --- /dev/null +++ b/3rd_party/collectd-ves-app/ves_app/ves_app.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import sys +import base64 +import ConfigParser +import logging +import argparse + +from distutils.util import strtobool +from kafka import KafkaConsumer + +from normalizer import Normalizer +from normalizer import CollectdValue + +try: + # For Python 3.0 and later + import urllib.request as url +except ImportError: + # Fall back to Python 2's urllib2 + import urllib2 as url + + +class VESApp(Normalizer): + """VES Application""" + + def __init__(self): + """Application initialization""" + self._app_config = { + 'Domain': '127.0.0.1', + 'Port': 30000, + 'Path': '', + 'Username': '', + 'Password': '', + 'Topic': '', + 'UseHttps': False, + 'SendEventInterval': 20.0, + 'ApiVersion': 5.1, + 'KafkaPort': 9092, + 'KafkaBroker': 'localhost' + } + + def send_data(self, event): + """Send event to VES""" + server_url = "http{}://{}:{}{}/eventListener/v{}{}".format( + 's' if self._app_config['UseHttps'] else '', + self._app_config['Domain'], int(self._app_config['Port']), + '{}'.format('/{}'.format(self._app_config['Path']) if len( + self._app_config['Path']) > 0 else ''), + int(self._app_config['ApiVersion']), '{}'.format( + '/{}'.format(self._app_config['Topic']) if len( + self._app_config['Topic']) > 0 else '')) + logging.info('Vendor Event Listener is at: {}'.format(server_url)) + credentials = base64.b64encode('{}:{}'.format( + self._app_config['Username'], + self._app_config['Password']).encode()).decode() + logging.info('Authentication credentials are: {}'.format(credentials)) + try: + request = url.Request(server_url) + request.add_header('Authorization', 'Basic {}'.format(credentials)) + request.add_header('Content-Type', 'application/json') + event_str = json.dumps(event).encode() + logging.debug("Sending {} to {}".format(event_str, server_url)) + url.urlopen(request, event_str, timeout=1) + logging.debug("Sent data to {} successfully".format(server_url)) + except url.HTTPError as e: + logging.error('Vendor Event Listener exception: {}'.format(e)) + except url.URLError as e: + logging.error( + 'Vendor Event Listener is is not reachable: {}'.format(e)) + except Exception as e: + logging.error('Vendor Event Listener error: {}'.format(e)) + + def config(self, config): + """VES option configuration""" + for key, value in config.items('config'): + if key in self._app_config: + try: + if type(self._app_config[key]) == int: + value = int(value) + elif type(self._app_config[key]) == float: + value = float(value) + elif type(self._app_config[key]) == bool: + value = bool(strtobool(value)) + + if isinstance(value, type(self._app_config[key])): + self._app_config[key] = value + else: + logging.error("Type mismatch with %s" % key) + sys.exit() + except ValueError: + logging.error("Incorrect value type for %s" % key) + sys.exit() + else: + logging.error("Incorrect key configuration %s" % key) + sys.exit() + + def init(self, configfile, schema_file): + if configfile is not None: + # read VES configuration file if provided + config = ConfigParser.ConfigParser() + config.optionxform = lambda option: option + config.read(configfile) + self.config(config) + # initialize normalizer + self.initialize(schema_file, self._app_config['SendEventInterval']) + + def run(self): + """Consumer JSON data from kafka broker""" + kafka_server = '{}:{}'.format( + self._app_config.get('KafkaBroker'), + self._app_config.get('KafkaPort')) + consumer = KafkaConsumer( + 'collectd', bootstrap_servers=kafka_server, + auto_offset_reset='latest', enable_auto_commit=False, + value_deserializer=lambda m: json.loads(m.decode('ascii'))) + + for message in consumer: + for kafka_data in message.value: + # { + # u'dstypes': [u'derive'], + # u'plugin': u'cpu', + # u'dsnames': [u'value'], + # u'interval': 10.0, + # u'host': u'localhost', + # u'values': [99.9978996416267], + # u'time': 1502114956.244, + # u'plugin_instance': u'44', + # u'type_instance': u'idle', + # u'type': u'cpu' + # } + logging.debug('{}:run():data={}'.format( + self.__class__.__name__, kafka_data)) + for ds_name in kafka_data['dsnames']: + index = kafka_data['dsnames'].index(ds_name) + val_hash = CollectdValue.hash_gen( + kafka_data['host'], kafka_data['plugin'], + kafka_data['plugin_instance'], kafka_data['type'], + kafka_data['type_instance'], ds_name) + collector = self.get_collector() + val = collector.get(val_hash) + if val: + # update the value + val.value = kafka_data['values'][index] + val.time = kafka_data['time'] + del(val) + else: + # add new value into the collector + val = CollectdValue() + val.host = kafka_data['host'] + val.plugin = kafka_data['plugin'] + val.plugin_instance = kafka_data['plugin_instance'] + val.type = kafka_data['type'] + val.type_instance = kafka_data['type_instance'] + val.value = kafka_data['values'][index] + val.interval = kafka_data['interval'] + val.time = kafka_data['time'] + val.ds_name = ds_name + collector.add(val) + + +def main(): + # Parsing cmdline options + parser = argparse.ArgumentParser() + parser.add_argument("--events-schema", dest="schema", required=True, + help="YAML events schema definition", metavar="FILE") + parser.add_argument("--config", dest="configfile", default=None, + help="Specify config file", metavar="FILE") + parser.add_argument("--loglevel", dest="level", default='INFO', + choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], + help="Specify log level (default: %(default)s)", + metavar="LEVEL") + parser.add_argument("--logfile", dest="logfile", default='ves_app.log', + help="Specify log file (default: %(default)s)", + metavar="FILE") + args = parser.parse_args() + + # Create log file + logging.basicConfig(filename=args.logfile, + format='%(asctime)s %(message)s', + level=args.level) + if args.configfile is None: + logging.warning("No configfile specified, using default options") + + # Create Application Instance + application_instance = VESApp() + application_instance.init(args.configfile, args.schema) + + try: + # Run the plugin + application_instance.run() + except KeyboardInterrupt: + logging.info(" - Ctrl-C handled, exiting gracefully") + except Exception as e: + logging.error('{}, {}'.format(type(e), e)) + finally: + application_instance.destroy() + sys.exit() + + +if __name__ == '__main__': + main() diff --git a/3rd_party/collectd-ves-app/ves_app/ves_app_config.conf b/3rd_party/collectd-ves-app/ves_app/ves_app_config.conf new file mode 100644 index 00000000..aee0816c --- /dev/null +++ b/3rd_party/collectd-ves-app/ves_app/ves_app_config.conf @@ -0,0 +1,12 @@ +[config] +Domain = 127.0.0.1 +Port = 30000 +Path = vendor_event_listener +Topic = example_vnf +UseHttps = false +Username = +Password = +SendEventInterval = 20 +ApiVersion = 3 +KafkaPort = 9092 +KafkaBroker = localhost |