diff options
Diffstat (limited to '3rd_party/collectd-ves-app/ves_app/normalizer.py')
-rw-r--r-- | 3rd_party/collectd-ves-app/ves_app/normalizer.py | 598 |
1 files changed, 598 insertions, 0 deletions
diff --git a/3rd_party/collectd-ves-app/ves_app/normalizer.py b/3rd_party/collectd-ves-app/ves_app/normalizer.py new file mode 100644 index 00000000..899c850b --- /dev/null +++ b/3rd_party/collectd-ves-app/ves_app/normalizer.py @@ -0,0 +1,598 @@ +# +# Copyright(c) 2017 Intel Corporation. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Authors: +# Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com> +# + +import yaml +import logging +import datetime +import time +from threading import RLock +from threading import Timer +from threading import Thread +import re + +# import YAML loader +try: + from yaml import CLoader as Loader +except ImportError: + from yaml import Loader + +# import synchronized queue +try: + # python 2.x + import Queue as queue +except ImportError: + # python 3.x + import queue + + +class Config(object): + """Configuration class used to pass config option into YAML file""" + + def __init__(self, interval): + self.interval = interval + + +class System(object): + """System class which provides information like host, time etc., into YAML + file""" + + def __init__(self): + self.hostname = 'localhost' + self._id = 0 + + @property + def id(self): + self._id = self._id + 1 + return self._id + + @property + def time(self): + return time.time() + + @property + def date(self): + return datetime.date.today().isoformat() + + +class ItemIterator(object): + """Item iterator returned by Collector class""" + + def __init__(self, collector, items): + """Item iterator init""" + logging.debug('{}:__init__()'.format(self.__class__.__name__)) + self._items = items + self._collector = collector + self._index = 0 + + def next(self): + """Returns next item from the list""" + if self._index == len(self._items): + raise StopIteration + curr_index = self._index + self._index = curr_index + 1 + return self.items[curr_index] + + def __getitem__(self, key): + """get item by index""" + return self._items[key] + + def __len__(self): + """Return length of elements""" + return len(self._items) + + def __del__(self): + """Destroy iterator and unlock the collector""" + logging.debug('{}:__del__()'.format(self.__class__.__name__)) + self._collector.unlock() + + +class ItemObject(object): + """Item object returned by Collector class""" + + def __init__(self, collector, hash_): + """Item object init""" + logging.debug('{}:__init__()'.format(self.__class__.__name__)) + super(ItemObject, self).__setattr__('_collector', collector) + super(ItemObject, self).__setattr__('_hash', hash_) + + def __setattr__(self, name, value): + t, item = self._collector._metrics[self._hash] + logging.debug('{}:__setattr__(name={}, value={})'.format( + self.__class__.__name__, name, value)) + setattr(item, name, value) + self._collector._metrics[self._hash] = (time.time(), item) + + def __del__(self): + """Destroy item object and unlock the collector""" + logging.debug('{}:__del__()'.format(self.__class__.__name__)) + self._collector.unlock() + + +class Collector(object): + """Thread-safe collector with aging feature""" + + def __init__(self, age_timeout): + """Initialization""" + self._metrics = {} + self._lock = RLock() + self._age_timeout = age_timeout + self._start_age_timer() + + def _start_age_timer(self): + """Start age timer""" + self._age_timer = Timer(self._age_timeout, self._on_timer) + self._age_timer.start() + + def _stop_age_timer(self): + """Stop age timer""" + self._age_timer.cancel() + + def _on_timer(self): + """Age timer""" + self._start_age_timer() + self._check_aging() + + def _check_aging(self): + """Check aging time for all items""" + self.lock() + for data_hash, data in self._metrics.items(): + age, item = data + if ((time.time() - age) >= self._age_timeout): + # aging time has expired, remove the item from the collector + logging.debug('{}:_check_aging():value={}'.format( + self.__class__.__name__, item)) + self._metrics.pop(data_hash) + del(item) + self.unlock() + + def lock(self): + """Lock the collector""" + logging.debug('{}:lock()'.format(self.__class__.__name__)) + self._lock.acquire() + + def unlock(self): + """Unlock the collector""" + logging.debug('{}:unlock()'.format(self.__class__.__name__)) + self._lock.release() + + def get(self, hash_): + self.lock() + if hash_ in self._metrics: + return ItemObject(self, hash_) + self.unlock() + return None + + def add(self, item): + """Add an item into the collector""" + self.lock() + logging.debug('{}:add(item={})'.format(self.__class__.__name__, item)) + self._metrics[hash(item)] = (time.time(), item) + self.unlock() + + def items(self, select_list=[]): + """Returns locked (safe) item iterator""" + metrics = [] + self.lock() + for k, item in self._metrics.items(): + _, value = item + for select in select_list: + if value.match(**select): + metrics.append(value) + return ItemIterator(self, metrics) + + def destroy(self): + """Destroy the collector""" + self._stop_age_timer() + + +class CollectdData(object): + """Base class for Collectd data""" + + def __init__(self, host=None, plugin=None, plugin_instance=None, + type_=None, type_instance=None, time_=None): + """Class initialization""" + self.host = host + self.plugin = plugin + self.plugin_instance = plugin_instance + self.type_instance = type_instance + self.type = type_ + self.time = time_ + + @classmethod + def is_regular_expression(cls, expr): + return True if expr[0] == '/' and expr[-1] == '/' else False + + def match(self, **kargs): + # compare the metric + for key, value in kargs.items(): + if self.is_regular_expression(value): + if re.match(value[1:-1], getattr(self, key)) is None: + return False + elif value != getattr(self, key): + return False + # return match event if kargs is empty + return True + + +class CollectdNotification(CollectdData): + """Collectd notification""" + + def __init__(self, host=None, plugin=None, plugin_instance=None, + type_=None, type_instance=None, severity=None, message=None): + super(CollectdNotification, self).__init__( + host, plugin, plugin_instance, type_, type_instance) + self.severity = severity + self.message = message + + def __repr__(self): + return '{}(host={}, plugin={}, plugin_instance={}, type={},' \ + 'type_instance={}, severity={}, message={}, time={})'.format( + self.__class__.__name__, self.host, self.plugin, + self.plugin_instance, self.type, self.type_instance, + self.severity, self.message, time) + + +class CollectdValue(CollectdData): + """Collectd value""" + + def __init__(self, host=None, plugin=None, plugin_instance=None, + type_=None, type_instance=None, ds_name='value', value=None, + interval=None): + super(CollectdValue, self).__init__( + host, plugin, plugin_instance, type_, type_instance) + self.value = value + self.ds_name = ds_name + self.interval = interval + + @classmethod + def hash_gen(cls, host, plugin, plugin_instance, type_, + type_instance, ds_name): + return hash((host, plugin, plugin_instance, type_, + type_instance, ds_name)) + + def __eq__(self, other): + return hash(self) == hash(other) and self.value == other.value + + def __hash__(self): + return self.hash_gen(self.host, self.plugin, self.plugin_instance, + self.type, self.type_instance, self.ds_name) + + def __repr__(self): + return '{}(host={}, plugin={}, plugin_instance={}, type={},' \ + 'type_instance={}, ds_name={}, value={}, time={})'.format( + self.__class__.__name__, self.host, self.plugin, + self.plugin_instance, self.type, self.type_instance, + self.ds_name, self.value, self.time) + + +class Item(yaml.YAMLObject): + """Base class to process tags like ArrayItem/ValueItem""" + + @classmethod + def format_node(cls, mapping, metric): + if mapping.tag in [ + 'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag, + Number.yaml_tag]: + return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric)) + elif mapping.tag == 'tag:yaml.org,2002:map': + values = [] + for key, value in mapping.value: + values.append((yaml.ScalarNode(key.tag, key.value), + cls.format_node(value, metric))) + return yaml.MappingNode(mapping.tag, values) + elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]: + values = [] + for seq in mapping.value: + map_values = list() + for key, value in seq.value: + if key.value == 'SELECT': + map_values.append((yaml.ScalarNode(key.tag, key.value), + cls.format_node(value, metric))) + else: + map_values.append((yaml.ScalarNode(key.tag, key.value), + value)) + values.append(yaml.MappingNode(seq.tag, map_values)) + return yaml.SequenceNode(mapping.tag, values) + elif mapping.tag in [MapValue.yaml_tag]: + values = [] + for key, value in mapping.value: + if key.value == 'VALUE': + values.append((yaml.ScalarNode(key.tag, key.value), + cls.format_node(value, metric))) + else: + values.append((yaml.ScalarNode(key.tag, key.value), value)) + return yaml.MappingNode(mapping.tag, values) + return mapping + + +class ValueItem(Item): + """Class to process VlaueItem tag""" + yaml_tag = u'!ValueItem' + + @classmethod + def from_yaml(cls, loader, node): + logging.debug('{}:from_yaml(loader={})'.format(cls.__name__, loader)) + default, select, value_desc = None, list(), None + # find value description + for elem in node.value: + for key, value in elem.value: + if key.value == 'VALUE': + assert value_desc is None, "VALUE key already set" + value_desc = value + if key.value == 'SELECT': + select.append(loader.construct_mapping(value)) + if key.value == 'DEFAULT': + assert default is None, "DEFAULT key already set" + default = loader.construct_object(value) + # if VALUE key isn't given, use default VALUE key + # format: `VALUE: !Number '{vl.value}'` + if value_desc is None: + value_desc = yaml.ScalarNode(tag=u'!Number', value=u'{vl.value}') + # select collectd metric based on SELECT condition + metrics = loader.collector.items(select) + assert len(metrics) < 2, \ + 'Wrong SELECT condition, selected {} metrics'.format(len(metrics)) + if len(metrics) > 0: + item = cls.format_node(value_desc, {'vl': metrics[0], + 'system': loader.system}) + return loader.construct_object(item) + # nothing has been found by SELECT condition, set to DEFAULT value. + assert default is not None, \ + "No metrics selected by SELECT condition and DEFAULT key isn't set" + return default + + +class ArrayItem(Item): + """Class to process ArrayItem tag""" + yaml_tag = u'!ArrayItem' + + @classmethod + def from_yaml(cls, loader, node): + logging.debug('{}:process(loader={}, node={})'.format(cls.__name__, + loader, node)) + # e.g.: + # SequenceNode(tag=u'!ArrayItem', value=[ + # MappingNode(tag=u'tag:yaml.org,2002:map', value=[ + # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'), + # MappingNode(tag=u'tag:yaml.org,2002:map', value=[ + # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'), + # , ...) + # ]), ... + # ), (key, value), ... ]) + # , ... ]) + assert isinstance(node, yaml.SequenceNode), \ + "{} tag isn't YAML array".format(cls.__name__) + select, index_keys, items, item_desc = list(), list(), list(), None + for elem in node.value: + for key, value in elem.value: + if key.value == 'ITEM-DESC': + assert item_desc is None, "ITEM-DESC key already set" + item_desc = value + if key.value == 'INDEX-KEY': + assert len(index_keys) == 0, "INDEX-KEY key already set" + index_keys = loader.construct_sequence(value) + if key.value == 'SELECT': + select.append(loader.construct_mapping(value)) + # validate item description + assert item_desc is not None, "Mandatory ITEM-DESC key isn't set" + assert len(select) > 0 or len(index_keys) > 0, \ + "Mandatory key (INDEX-KEY or SELECT) isn't set" + metrics = loader.collector.items(select) + # select metrics based on INDEX-KEY provided + if len(index_keys) > 0: + metric_set = set() + for metric in metrics: + value_params = {} + for key in index_keys: + value_params[key] = getattr(metric, key) + metric_set.add(CollectdValue(**value_params)) + metrics = list(metric_set) + # build items based on SELECT and/or INDEX-KEY criteria + for metric in metrics: + item = cls.format_node(item_desc, + {'vl': metric, 'system': loader.system, + 'config': loader.config}) + items.append(loader.construct_mapping(item)) + return items + + +class Measurements(ArrayItem): + """Class to process Measurements tag""" + yaml_tag = u'!Measurements' + + +class Events(Item): + """Class to process Events tag""" + yaml_tag = u'!Events' + + @classmethod + def from_yaml(cls, loader, node): + condition, item_desc = dict(), None + for elem in node.value: + for key, value in elem.value: + if key.value == 'ITEM-DESC': + item_desc = value + if key.value == 'CONDITION': + condition = loader.construct_mapping(value) + assert item_desc is not None, "Mandatory ITEM-DESC key isn't set" + if loader.notification.match(**condition): + item = cls.format_node(item_desc, { + 'n': loader.notification, 'system': loader.system}) + return loader.construct_mapping(item) + return None + + +class Bytes2Kibibytes(yaml.YAMLObject): + """Class to process Bytes2Kibibytes tag""" + yaml_tag = u'!Bytes2Kibibytes' + + @classmethod + def from_yaml(cls, loader, node): + return round(float(node.value) / 1024.0, 3) + + +class Number(yaml.YAMLObject): + """Class to process Number tag""" + yaml_tag = u'!Number' + + @classmethod + def from_yaml(cls, loader, node): + try: + return int(node.value) + except ValueError: + return float(node.value) + + +class MapValue(yaml.YAMLObject): + """Class to process MapValue tag""" + yaml_tag = u'!MapValue' + + @classmethod + def from_yaml(cls, loader, node): + mapping, val = None, None + for key, value in node.value: + if key.value == 'TO': + mapping = loader.construct_mapping(value) + if key.value == 'VALUE': + val = loader.construct_object(value) + assert mapping is not None, "Mandatory TO key isn't set" + assert val is not None, "Mandatory VALUE key isn't set" + assert val in mapping, \ + 'Value "{}" cannot be mapped to any of {} values'.format( + val, mapping.keys()) + return mapping[val] + + +class Normalizer(object): + """Normalization class which handles events and measurements""" + + def __init__(self): + """Init""" + self.interval = None + self.collector = None + self.system = None + self.queue = None + self.timer = None + + @classmethod + def read_configuration(cls, config_file): + """read YAML configuration file""" + # load YAML events/measurements definition + f = open(config_file, 'r') + doc_yaml = yaml.compose(f) + f.close() + # split events & measurements definitions + measurements, events = list(), list() + for key, value in doc_yaml.value: + if value.tag == Measurements.yaml_tag: + measurements.append((key, value)) + if value.tag == Events.yaml_tag: + events.append((key, value)) + measurements_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map', + measurements) + measurements_stream = yaml.serialize(measurements_yaml) + events_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map', events) + events_stream = yaml.serialize(events_yaml) + # return event & measurements definition + return events_stream, measurements_stream + + def initialize(self, config_file, interval): + """Initialize the class""" + e, m = self.read_configuration(config_file) + self.measurements_stream = m + self.events_stream = e + self.system = System() + self.config = Config(interval) + self.interval = interval + # start collector with aging time = double interval + self.collector = Collector(interval * 2) + # initialize event thread + self.queue = queue.Queue() + self.event_thread = Thread(target=self.event_worker) + self.event_thread.daemon = True + self.event_thread.start() + # initialize measurements timer + self.start_timer() + + def destroy(self): + """Destroy the class""" + self.collector.destroy() + self.post_event(None) # send stop event + self.event_thread.join() + self.stop_timer() + + def start_timer(self): + """Start measurements timer""" + self.timer = Timer(self.interval, self.on_timer) + self.timer.start() + + def stop_timer(self): + """Stop measurements timer""" + self.timer.cancel() + + def on_timer(self): + """Measurements timer""" + self.start_timer() + self.process_measurements() + + def event_worker(self): + """Event worker""" + while True: + event = self.queue.get() + if isinstance(event, CollectdNotification): + self.process_notify(event) + continue + # exit for the worker + break + + def get_collector(self): + """Get metric collector reference""" + return self.collector + + def process_measurements(self): + """Process measurements""" + loader = Loader(self.measurements_stream) + setattr(loader, 'collector', self.collector) + setattr(loader, 'system', self.system) + setattr(loader, 'config', self.config) + measurements = loader.get_data() + for measurement_name in measurements: + logging.debug('Process "{}" measurements: {}'.format( + measurement_name, measurements[measurement_name])) + for measurement in measurements[measurement_name]: + self.send_data(measurement) + + def process_notify(self, notification): + """Process events""" + loader = Loader(self.events_stream) + setattr(loader, 'notification', notification) + setattr(loader, 'system', self.system) + notifications = loader.get_data() + for notify_name in notifications: + logging.debug('Process "{}" notification'.format(notify_name)) + if notifications[notify_name] is not None: + self.send_data(notifications[notify_name]) + + def send_data(self, data): + """Send data""" + assert False, 'send_data() is abstract function and MUST be overridden' + + def post_event(self, notification): + """Post notification into the queue to process""" + self.queue.put(notification) |