aboutsummaryrefslogtreecommitdiffstats
path: root/3rd_party/collectd-ves-app
diff options
context:
space:
mode:
Diffstat (limited to '3rd_party/collectd-ves-app')
-rw-r--r--3rd_party/collectd-ves-app/LICENSE23
-rw-r--r--3rd_party/collectd-ves-app/PSF_LICENSE_AGREEMENT36
-rw-r--r--3rd_party/collectd-ves-app/ves_app/__init__.py0
-rw-r--r--3rd_party/collectd-ves-app/ves_app/host.yaml214
-rw-r--r--3rd_party/collectd-ves-app/ves_app/normalizer.py598
-rw-r--r--3rd_party/collectd-ves-app/ves_app/ves_app.py214
-rw-r--r--3rd_party/collectd-ves-app/ves_app/ves_app_config.conf12
7 files changed, 1097 insertions, 0 deletions
diff --git a/3rd_party/collectd-ves-app/LICENSE b/3rd_party/collectd-ves-app/LICENSE
new file mode 100644
index 00000000..804ec198
--- /dev/null
+++ b/3rd_party/collectd-ves-app/LICENSE
@@ -0,0 +1,23 @@
+Copyright Intel Corporation 2016-2017
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+of the Software, and to permit persons to whom the Software is furnished to do
+so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/3rd_party/collectd-ves-app/PSF_LICENSE_AGREEMENT b/3rd_party/collectd-ves-app/PSF_LICENSE_AGREEMENT
new file mode 100644
index 00000000..666648bb
--- /dev/null
+++ b/3rd_party/collectd-ves-app/PSF_LICENSE_AGREEMENT
@@ -0,0 +1,36 @@
+PSF LICENSE AGREEMENT FOR PYTHON 2.7.1
+1. This LICENSE AGREEMENT is between the Python Software Foundation (“PSF”),
+ and the Individual or Organization (“Licensee”) accessing and otherwise
+ using Python 2.7.1 software in source or binary form and its associated
+ documentation.
+2. Subject to the terms and conditions of this License Agreement, PSF hereby
+ grants Licensee a nonexclusive, royalty-free, world-wide license to
+ reproduce, analyze, test, perform and/or display publicly, prepare
+ derivative works, distribute, and otherwise use Python 2.7.1 alone or in any
+ derivative version, provided, however, that PSF’s License Agreement and
+ PSF’s notice of copyright, i.e., “Copyright © 2001-2010 Python Software
+ Foundation; All Rights Reserved” are retained in Python 2.7.1 alone or in
+ any derivative version prepared by Licensee.
+3. In the event Licensee prepares a derivative work that is based on or
+ incorporates Python 2.7.1 or any part thereof, and wants to make the
+ derivative work available to others as provided herein, then Licensee hereby
+ agrees to include in any such work a brief summary of the changes made to
+ Python 2.7.1.
+4. PSF is making Python 2.7.1 available to Licensee on an “AS IS” basis. PSF
+ MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED. BY WAY OF
+ EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND DISCLAIMS ANY REPRESENTATION OR
+ WARRANTY OF MERCHANTABILITY OR FITNESS FOR ANY PARTICULAR PURPOSE OR THAT
+ THE USE OF PYTHON 2.7.1 WILL NOT INFRINGE ANY THIRD PARTY RIGHTS.
+5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON 2.7.1 FOR
+ ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS A RESULT OF
+ MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON 2.7.1, OR ANY DERIVATIVE
+ THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
+6. This License Agreement will automatically terminate upon a material breach
+ of its terms and conditions.
+7. Nothing in this License Agreement shall be deemed to create any relationship
+ of agency, partnership, or joint venture between PSF and Licensee. This
+ License Agreement does not grant permission to use PSF trademarks or trade name
+ in a trademark sense to endorse or promote products or services of Licensee,
+ or any third party. 8. By copying, installing or otherwise using Python
+ 2.7.1, Licensee agrees to be bound by the terms and conditions of this License
+ Agreement.
diff --git a/3rd_party/collectd-ves-app/ves_app/__init__.py b/3rd_party/collectd-ves-app/ves_app/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/3rd_party/collectd-ves-app/ves_app/__init__.py
diff --git a/3rd_party/collectd-ves-app/ves_app/host.yaml b/3rd_party/collectd-ves-app/ves_app/host.yaml
new file mode 100644
index 00000000..a91574cf
--- /dev/null
+++ b/3rd_party/collectd-ves-app/ves_app/host.yaml
@@ -0,0 +1,214 @@
+---
+# Common event header definition (required fields and defaults)
+commonEventHeader: &commonEventHeader
+ domain: N/A
+ eventId: "{system.id}"
+ eventName: ""
+ eventType: Info
+ lastEpochMicrosec: 0
+ priority: Normal
+ reportingEntityId: &reportingEntityId "{system.hostname}"
+ reportingEntityName: *reportingEntityId
+ sequence: 0
+ sourceName: N/A
+ startEpochMicrosec: 0
+ version: 2.0
+
+# Value mapping (used to map collectd notification severity to VES)
+collectdSeverityMapping: &collectdSeverityMapping
+ NOTIF_FAILURE: CRITICAL
+ NOTIF_WARNING: WARNING
+ NOTIF_OKAY: NORMAL
+
+# Measurements definition
+Host Measurements: !Measurements
+ - ITEM-DESC:
+ event:
+ commonEventHeader:
+ <<: *commonEventHeader
+ eventType: hostOS
+ domain: measurementsForVfScaling
+ sourceId: &sourceId "{vl.plugin_instance}"
+ sourceName: *sourceId
+ startEpochMicrosec: !Number "{vl.time}"
+ measurementsForVfScalingFields:
+ measurementsForVfScalingVersion: 2.0
+ additionalFields: !ArrayItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: "/^(?!memory|virt_vcpu|disk_octets|disk_ops|if_packets|if_errors|if_octets|if_dropped).*$/"
+ - ITEM-DESC:
+ name: "{vl.type}-{vl.type_instance}-{vl.ds_name}"
+ value: "{vl.value}"
+ additionalMeasurements: !ArrayItem
+ - SELECT:
+ plugin: "/^(?!virt).*$/"
+ - INDEX-KEY:
+ - plugin
+ - plugin_instance
+ - ITEM-DESC:
+ name: "{vl.plugin}-{vl.plugin_instance}"
+ arrayOfFields: !ArrayItem
+ - SELECT:
+ plugin: "{vl.plugin}"
+ plugin_instance: "{vl.plugin_instance}"
+ - ITEM-DESC:
+ name: "{vl.type}-{vl.type_instance}-{vl.ds_name}"
+ value: "{vl.value}"
+ measurementInterval: !Number "{vl.interval}"
+ memoryUsageArray: !ArrayItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: memory
+ type_instance: total
+ - ITEM-DESC:
+ memoryConfigured: !Bytes2Kibibytes "{vl.value}"
+ vmIdentifier: "{vl.plugin_instance}"
+ memoryUsed: 0.0
+ memoryFree: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: memory
+ type_instance: rss
+ - VALUE: !Bytes2Kibibytes "{vl.value}"
+ - DEFAULT: 0
+ cpuUsageArray: !ArrayItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: virt_vcpu
+ - ITEM-DESC:
+ cpuIdentifier: "{vl.type_instance}"
+ percentUsage: !Number "{vl.value}"
+ vNicPerformanceArray: !ArrayItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: if_packets
+ ds_name: rx
+ - ITEM-DESC:
+ valuesAreSuspect: "true"
+ vNicIdentifier: "{vl.type_instance}"
+ receivedTotalPacketsAccumulated: !Number "{vl.value}"
+ transmittedTotalPacketsAccumulated: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: if_packets
+ type_instance: "{vl.type_instance}"
+ ds_name: tx
+ receivedOctetsAccumulated: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: if_octets
+ type_instance: "{vl.type_instance}"
+ ds_name: rx
+ transmittedOctetsAccumulated: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: if_octets
+ type_instance: "{vl.type_instance}"
+ ds_name: tx
+ receivedErrorPacketsAccumulated: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: if_errors
+ type_instance: "{vl.type_instance}"
+ ds_name: rx
+ transmittedErrorPacketsAccumulated: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: if_errors
+ type_instance: "{vl.type_instance}"
+ ds_name: tx
+ receivedDiscardedPacketsAccumulated: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: if_dropped
+ type_instance: "{vl.type_instance}"
+ ds_name: rx
+ transmittedDiscardedPacketsAccumulated: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: if_dropped
+ type_instance: "{vl.type_instance}"
+ ds_name: tx
+ diskUsageArray: !ArrayItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: disk_octets
+ ds_name: read
+ - ITEM-DESC:
+ diskIdentifier: "{vl.type_instance}"
+ diskOctetsReadLast: !Number "{vl.value}"
+ diskOctetsWriteLast: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: disk_octets
+ type_instance: "{vl.type_instance}"
+ ds_name: write
+ diskOpsReadLast: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: disk_ops
+ type_instance: "{vl.type_instance}"
+ ds_name: read
+ diskOpsWriteLast: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: disk_ops
+ type_instance: "{vl.type_instance}"
+ ds_name: write
+ - SELECT:
+ plugin: virt
+ type_instance: virt_cpu_total
+
+Virt Events: !Events
+ - ITEM-DESC:
+ event:
+ commonEventHeader: &event_commonEventHeader
+ <<: *commonEventHeader
+ domain: fault
+ eventType: Notification
+ sourceId: &event_sourceId "{n.plugin_instance}"
+ sourceName: *event_sourceId
+ lastEpochMicrosec: !Number "{n.time}"
+ startEpochMicrosec: !Number "{n.time}"
+ faultFields: &faultFields
+ alarmInterfaceA: "{n.plugin}-{n.plugin_instance}"
+ alarmCondition: "{n.message}"
+ eventSeverity: !MapValue
+ VALUE: "{n.severity}"
+ TO: *collectdSeverityMapping
+ eventSourceType: hypervisor
+ faultFieldsVersion: 1.1
+ specificProblem: "{n.plugin_instance}-{n.type_instance}"
+ vfStatus: Active
+ - CONDITION:
+ plugin: virt
+
+Host Events: !Events
+ - ITEM-DESC:
+ event:
+ commonEventHeader:
+ <<: *event_commonEventHeader
+ sourceId: "{system.hostname}"
+ sourceName: "{system.hostname}"
+ faultFields:
+ <<: *faultFields
+ eventSourceType: host
+ - CONDITION:
+ plugin: "/^(?!virt).*$/"
diff --git a/3rd_party/collectd-ves-app/ves_app/normalizer.py b/3rd_party/collectd-ves-app/ves_app/normalizer.py
new file mode 100644
index 00000000..899c850b
--- /dev/null
+++ b/3rd_party/collectd-ves-app/ves_app/normalizer.py
@@ -0,0 +1,598 @@
+#
+# Copyright(c) 2017 Intel Corporation. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Authors:
+# Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
+#
+
+import yaml
+import logging
+import datetime
+import time
+from threading import RLock
+from threading import Timer
+from threading import Thread
+import re
+
+# import YAML loader
+try:
+ from yaml import CLoader as Loader
+except ImportError:
+ from yaml import Loader
+
+# import synchronized queue
+try:
+ # python 2.x
+ import Queue as queue
+except ImportError:
+ # python 3.x
+ import queue
+
+
+class Config(object):
+ """Configuration class used to pass config option into YAML file"""
+
+ def __init__(self, interval):
+ self.interval = interval
+
+
+class System(object):
+ """System class which provides information like host, time etc., into YAML
+ file"""
+
+ def __init__(self):
+ self.hostname = 'localhost'
+ self._id = 0
+
+ @property
+ def id(self):
+ self._id = self._id + 1
+ return self._id
+
+ @property
+ def time(self):
+ return time.time()
+
+ @property
+ def date(self):
+ return datetime.date.today().isoformat()
+
+
+class ItemIterator(object):
+ """Item iterator returned by Collector class"""
+
+ def __init__(self, collector, items):
+ """Item iterator init"""
+ logging.debug('{}:__init__()'.format(self.__class__.__name__))
+ self._items = items
+ self._collector = collector
+ self._index = 0
+
+ def next(self):
+ """Returns next item from the list"""
+ if self._index == len(self._items):
+ raise StopIteration
+ curr_index = self._index
+ self._index = curr_index + 1
+ return self.items[curr_index]
+
+ def __getitem__(self, key):
+ """get item by index"""
+ return self._items[key]
+
+ def __len__(self):
+ """Return length of elements"""
+ return len(self._items)
+
+ def __del__(self):
+ """Destroy iterator and unlock the collector"""
+ logging.debug('{}:__del__()'.format(self.__class__.__name__))
+ self._collector.unlock()
+
+
+class ItemObject(object):
+ """Item object returned by Collector class"""
+
+ def __init__(self, collector, hash_):
+ """Item object init"""
+ logging.debug('{}:__init__()'.format(self.__class__.__name__))
+ super(ItemObject, self).__setattr__('_collector', collector)
+ super(ItemObject, self).__setattr__('_hash', hash_)
+
+ def __setattr__(self, name, value):
+ t, item = self._collector._metrics[self._hash]
+ logging.debug('{}:__setattr__(name={}, value={})'.format(
+ self.__class__.__name__, name, value))
+ setattr(item, name, value)
+ self._collector._metrics[self._hash] = (time.time(), item)
+
+ def __del__(self):
+ """Destroy item object and unlock the collector"""
+ logging.debug('{}:__del__()'.format(self.__class__.__name__))
+ self._collector.unlock()
+
+
+class Collector(object):
+ """Thread-safe collector with aging feature"""
+
+ def __init__(self, age_timeout):
+ """Initialization"""
+ self._metrics = {}
+ self._lock = RLock()
+ self._age_timeout = age_timeout
+ self._start_age_timer()
+
+ def _start_age_timer(self):
+ """Start age timer"""
+ self._age_timer = Timer(self._age_timeout, self._on_timer)
+ self._age_timer.start()
+
+ def _stop_age_timer(self):
+ """Stop age timer"""
+ self._age_timer.cancel()
+
+ def _on_timer(self):
+ """Age timer"""
+ self._start_age_timer()
+ self._check_aging()
+
+ def _check_aging(self):
+ """Check aging time for all items"""
+ self.lock()
+ for data_hash, data in self._metrics.items():
+ age, item = data
+ if ((time.time() - age) >= self._age_timeout):
+ # aging time has expired, remove the item from the collector
+ logging.debug('{}:_check_aging():value={}'.format(
+ self.__class__.__name__, item))
+ self._metrics.pop(data_hash)
+ del(item)
+ self.unlock()
+
+ def lock(self):
+ """Lock the collector"""
+ logging.debug('{}:lock()'.format(self.__class__.__name__))
+ self._lock.acquire()
+
+ def unlock(self):
+ """Unlock the collector"""
+ logging.debug('{}:unlock()'.format(self.__class__.__name__))
+ self._lock.release()
+
+ def get(self, hash_):
+ self.lock()
+ if hash_ in self._metrics:
+ return ItemObject(self, hash_)
+ self.unlock()
+ return None
+
+ def add(self, item):
+ """Add an item into the collector"""
+ self.lock()
+ logging.debug('{}:add(item={})'.format(self.__class__.__name__, item))
+ self._metrics[hash(item)] = (time.time(), item)
+ self.unlock()
+
+ def items(self, select_list=[]):
+ """Returns locked (safe) item iterator"""
+ metrics = []
+ self.lock()
+ for k, item in self._metrics.items():
+ _, value = item
+ for select in select_list:
+ if value.match(**select):
+ metrics.append(value)
+ return ItemIterator(self, metrics)
+
+ def destroy(self):
+ """Destroy the collector"""
+ self._stop_age_timer()
+
+
+class CollectdData(object):
+ """Base class for Collectd data"""
+
+ def __init__(self, host=None, plugin=None, plugin_instance=None,
+ type_=None, type_instance=None, time_=None):
+ """Class initialization"""
+ self.host = host
+ self.plugin = plugin
+ self.plugin_instance = plugin_instance
+ self.type_instance = type_instance
+ self.type = type_
+ self.time = time_
+
+ @classmethod
+ def is_regular_expression(cls, expr):
+ return True if expr[0] == '/' and expr[-1] == '/' else False
+
+ def match(self, **kargs):
+ # compare the metric
+ for key, value in kargs.items():
+ if self.is_regular_expression(value):
+ if re.match(value[1:-1], getattr(self, key)) is None:
+ return False
+ elif value != getattr(self, key):
+ return False
+ # return match event if kargs is empty
+ return True
+
+
+class CollectdNotification(CollectdData):
+ """Collectd notification"""
+
+ def __init__(self, host=None, plugin=None, plugin_instance=None,
+ type_=None, type_instance=None, severity=None, message=None):
+ super(CollectdNotification, self).__init__(
+ host, plugin, plugin_instance, type_, type_instance)
+ self.severity = severity
+ self.message = message
+
+ def __repr__(self):
+ return '{}(host={}, plugin={}, plugin_instance={}, type={},' \
+ 'type_instance={}, severity={}, message={}, time={})'.format(
+ self.__class__.__name__, self.host, self.plugin,
+ self.plugin_instance, self.type, self.type_instance,
+ self.severity, self.message, time)
+
+
+class CollectdValue(CollectdData):
+ """Collectd value"""
+
+ def __init__(self, host=None, plugin=None, plugin_instance=None,
+ type_=None, type_instance=None, ds_name='value', value=None,
+ interval=None):
+ super(CollectdValue, self).__init__(
+ host, plugin, plugin_instance, type_, type_instance)
+ self.value = value
+ self.ds_name = ds_name
+ self.interval = interval
+
+ @classmethod
+ def hash_gen(cls, host, plugin, plugin_instance, type_,
+ type_instance, ds_name):
+ return hash((host, plugin, plugin_instance, type_,
+ type_instance, ds_name))
+
+ def __eq__(self, other):
+ return hash(self) == hash(other) and self.value == other.value
+
+ def __hash__(self):
+ return self.hash_gen(self.host, self.plugin, self.plugin_instance,
+ self.type, self.type_instance, self.ds_name)
+
+ def __repr__(self):
+ return '{}(host={}, plugin={}, plugin_instance={}, type={},' \
+ 'type_instance={}, ds_name={}, value={}, time={})'.format(
+ self.__class__.__name__, self.host, self.plugin,
+ self.plugin_instance, self.type, self.type_instance,
+ self.ds_name, self.value, self.time)
+
+
+class Item(yaml.YAMLObject):
+ """Base class to process tags like ArrayItem/ValueItem"""
+
+ @classmethod
+ def format_node(cls, mapping, metric):
+ if mapping.tag in [
+ 'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag,
+ Number.yaml_tag]:
+ return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric))
+ elif mapping.tag == 'tag:yaml.org,2002:map':
+ values = []
+ for key, value in mapping.value:
+ values.append((yaml.ScalarNode(key.tag, key.value),
+ cls.format_node(value, metric)))
+ return yaml.MappingNode(mapping.tag, values)
+ elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]:
+ values = []
+ for seq in mapping.value:
+ map_values = list()
+ for key, value in seq.value:
+ if key.value == 'SELECT':
+ map_values.append((yaml.ScalarNode(key.tag, key.value),
+ cls.format_node(value, metric)))
+ else:
+ map_values.append((yaml.ScalarNode(key.tag, key.value),
+ value))
+ values.append(yaml.MappingNode(seq.tag, map_values))
+ return yaml.SequenceNode(mapping.tag, values)
+ elif mapping.tag in [MapValue.yaml_tag]:
+ values = []
+ for key, value in mapping.value:
+ if key.value == 'VALUE':
+ values.append((yaml.ScalarNode(key.tag, key.value),
+ cls.format_node(value, metric)))
+ else:
+ values.append((yaml.ScalarNode(key.tag, key.value), value))
+ return yaml.MappingNode(mapping.tag, values)
+ return mapping
+
+
+class ValueItem(Item):
+ """Class to process VlaueItem tag"""
+ yaml_tag = u'!ValueItem'
+
+ @classmethod
+ def from_yaml(cls, loader, node):
+ logging.debug('{}:from_yaml(loader={})'.format(cls.__name__, loader))
+ default, select, value_desc = None, list(), None
+ # find value description
+ for elem in node.value:
+ for key, value in elem.value:
+ if key.value == 'VALUE':
+ assert value_desc is None, "VALUE key already set"
+ value_desc = value
+ if key.value == 'SELECT':
+ select.append(loader.construct_mapping(value))
+ if key.value == 'DEFAULT':
+ assert default is None, "DEFAULT key already set"
+ default = loader.construct_object(value)
+ # if VALUE key isn't given, use default VALUE key
+ # format: `VALUE: !Number '{vl.value}'`
+ if value_desc is None:
+ value_desc = yaml.ScalarNode(tag=u'!Number', value=u'{vl.value}')
+ # select collectd metric based on SELECT condition
+ metrics = loader.collector.items(select)
+ assert len(metrics) < 2, \
+ 'Wrong SELECT condition, selected {} metrics'.format(len(metrics))
+ if len(metrics) > 0:
+ item = cls.format_node(value_desc, {'vl': metrics[0],
+ 'system': loader.system})
+ return loader.construct_object(item)
+ # nothing has been found by SELECT condition, set to DEFAULT value.
+ assert default is not None, \
+ "No metrics selected by SELECT condition and DEFAULT key isn't set"
+ return default
+
+
+class ArrayItem(Item):
+ """Class to process ArrayItem tag"""
+ yaml_tag = u'!ArrayItem'
+
+ @classmethod
+ def from_yaml(cls, loader, node):
+ logging.debug('{}:process(loader={}, node={})'.format(cls.__name__,
+ loader, node))
+ # e.g.:
+ # SequenceNode(tag=u'!ArrayItem', value=[
+ # MappingNode(tag=u'tag:yaml.org,2002:map', value=[
+ # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'),
+ # MappingNode(tag=u'tag:yaml.org,2002:map', value=[
+ # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'),
+ # , ...)
+ # ]), ...
+ # ), (key, value), ... ])
+ # , ... ])
+ assert isinstance(node, yaml.SequenceNode), \
+ "{} tag isn't YAML array".format(cls.__name__)
+ select, index_keys, items, item_desc = list(), list(), list(), None
+ for elem in node.value:
+ for key, value in elem.value:
+ if key.value == 'ITEM-DESC':
+ assert item_desc is None, "ITEM-DESC key already set"
+ item_desc = value
+ if key.value == 'INDEX-KEY':
+ assert len(index_keys) == 0, "INDEX-KEY key already set"
+ index_keys = loader.construct_sequence(value)
+ if key.value == 'SELECT':
+ select.append(loader.construct_mapping(value))
+ # validate item description
+ assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
+ assert len(select) > 0 or len(index_keys) > 0, \
+ "Mandatory key (INDEX-KEY or SELECT) isn't set"
+ metrics = loader.collector.items(select)
+ # select metrics based on INDEX-KEY provided
+ if len(index_keys) > 0:
+ metric_set = set()
+ for metric in metrics:
+ value_params = {}
+ for key in index_keys:
+ value_params[key] = getattr(metric, key)
+ metric_set.add(CollectdValue(**value_params))
+ metrics = list(metric_set)
+ # build items based on SELECT and/or INDEX-KEY criteria
+ for metric in metrics:
+ item = cls.format_node(item_desc,
+ {'vl': metric, 'system': loader.system,
+ 'config': loader.config})
+ items.append(loader.construct_mapping(item))
+ return items
+
+
+class Measurements(ArrayItem):
+ """Class to process Measurements tag"""
+ yaml_tag = u'!Measurements'
+
+
+class Events(Item):
+ """Class to process Events tag"""
+ yaml_tag = u'!Events'
+
+ @classmethod
+ def from_yaml(cls, loader, node):
+ condition, item_desc = dict(), None
+ for elem in node.value:
+ for key, value in elem.value:
+ if key.value == 'ITEM-DESC':
+ item_desc = value
+ if key.value == 'CONDITION':
+ condition = loader.construct_mapping(value)
+ assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
+ if loader.notification.match(**condition):
+ item = cls.format_node(item_desc, {
+ 'n': loader.notification, 'system': loader.system})
+ return loader.construct_mapping(item)
+ return None
+
+
+class Bytes2Kibibytes(yaml.YAMLObject):
+ """Class to process Bytes2Kibibytes tag"""
+ yaml_tag = u'!Bytes2Kibibytes'
+
+ @classmethod
+ def from_yaml(cls, loader, node):
+ return round(float(node.value) / 1024.0, 3)
+
+
+class Number(yaml.YAMLObject):
+ """Class to process Number tag"""
+ yaml_tag = u'!Number'
+
+ @classmethod
+ def from_yaml(cls, loader, node):
+ try:
+ return int(node.value)
+ except ValueError:
+ return float(node.value)
+
+
+class MapValue(yaml.YAMLObject):
+ """Class to process MapValue tag"""
+ yaml_tag = u'!MapValue'
+
+ @classmethod
+ def from_yaml(cls, loader, node):
+ mapping, val = None, None
+ for key, value in node.value:
+ if key.value == 'TO':
+ mapping = loader.construct_mapping(value)
+ if key.value == 'VALUE':
+ val = loader.construct_object(value)
+ assert mapping is not None, "Mandatory TO key isn't set"
+ assert val is not None, "Mandatory VALUE key isn't set"
+ assert val in mapping, \
+ 'Value "{}" cannot be mapped to any of {} values'.format(
+ val, mapping.keys())
+ return mapping[val]
+
+
+class Normalizer(object):
+ """Normalization class which handles events and measurements"""
+
+ def __init__(self):
+ """Init"""
+ self.interval = None
+ self.collector = None
+ self.system = None
+ self.queue = None
+ self.timer = None
+
+ @classmethod
+ def read_configuration(cls, config_file):
+ """read YAML configuration file"""
+ # load YAML events/measurements definition
+ f = open(config_file, 'r')
+ doc_yaml = yaml.compose(f)
+ f.close()
+ # split events & measurements definitions
+ measurements, events = list(), list()
+ for key, value in doc_yaml.value:
+ if value.tag == Measurements.yaml_tag:
+ measurements.append((key, value))
+ if value.tag == Events.yaml_tag:
+ events.append((key, value))
+ measurements_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map',
+ measurements)
+ measurements_stream = yaml.serialize(measurements_yaml)
+ events_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map', events)
+ events_stream = yaml.serialize(events_yaml)
+ # return event & measurements definition
+ return events_stream, measurements_stream
+
+ def initialize(self, config_file, interval):
+ """Initialize the class"""
+ e, m = self.read_configuration(config_file)
+ self.measurements_stream = m
+ self.events_stream = e
+ self.system = System()
+ self.config = Config(interval)
+ self.interval = interval
+ # start collector with aging time = double interval
+ self.collector = Collector(interval * 2)
+ # initialize event thread
+ self.queue = queue.Queue()
+ self.event_thread = Thread(target=self.event_worker)
+ self.event_thread.daemon = True
+ self.event_thread.start()
+ # initialize measurements timer
+ self.start_timer()
+
+ def destroy(self):
+ """Destroy the class"""
+ self.collector.destroy()
+ self.post_event(None) # send stop event
+ self.event_thread.join()
+ self.stop_timer()
+
+ def start_timer(self):
+ """Start measurements timer"""
+ self.timer = Timer(self.interval, self.on_timer)
+ self.timer.start()
+
+ def stop_timer(self):
+ """Stop measurements timer"""
+ self.timer.cancel()
+
+ def on_timer(self):
+ """Measurements timer"""
+ self.start_timer()
+ self.process_measurements()
+
+ def event_worker(self):
+ """Event worker"""
+ while True:
+ event = self.queue.get()
+ if isinstance(event, CollectdNotification):
+ self.process_notify(event)
+ continue
+ # exit for the worker
+ break
+
+ def get_collector(self):
+ """Get metric collector reference"""
+ return self.collector
+
+ def process_measurements(self):
+ """Process measurements"""
+ loader = Loader(self.measurements_stream)
+ setattr(loader, 'collector', self.collector)
+ setattr(loader, 'system', self.system)
+ setattr(loader, 'config', self.config)
+ measurements = loader.get_data()
+ for measurement_name in measurements:
+ logging.debug('Process "{}" measurements: {}'.format(
+ measurement_name, measurements[measurement_name]))
+ for measurement in measurements[measurement_name]:
+ self.send_data(measurement)
+
+ def process_notify(self, notification):
+ """Process events"""
+ loader = Loader(self.events_stream)
+ setattr(loader, 'notification', notification)
+ setattr(loader, 'system', self.system)
+ notifications = loader.get_data()
+ for notify_name in notifications:
+ logging.debug('Process "{}" notification'.format(notify_name))
+ if notifications[notify_name] is not None:
+ self.send_data(notifications[notify_name])
+
+ def send_data(self, data):
+ """Send data"""
+ assert False, 'send_data() is abstract function and MUST be overridden'
+
+ def post_event(self, notification):
+ """Post notification into the queue to process"""
+ self.queue.put(notification)
diff --git a/3rd_party/collectd-ves-app/ves_app/ves_app.py b/3rd_party/collectd-ves-app/ves_app/ves_app.py
new file mode 100644
index 00000000..105c66e2
--- /dev/null
+++ b/3rd_party/collectd-ves-app/ves_app/ves_app.py
@@ -0,0 +1,214 @@
+#!/usr/bin/env python
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import sys
+import base64
+import ConfigParser
+import logging
+import argparse
+
+from distutils.util import strtobool
+from kafka import KafkaConsumer
+
+from normalizer import Normalizer
+from normalizer import CollectdValue
+
+try:
+ # For Python 3.0 and later
+ import urllib.request as url
+except ImportError:
+ # Fall back to Python 2's urllib2
+ import urllib2 as url
+
+
+class VESApp(Normalizer):
+ """VES Application"""
+
+ def __init__(self):
+ """Application initialization"""
+ self._app_config = {
+ 'Domain': '127.0.0.1',
+ 'Port': 30000,
+ 'Path': '',
+ 'Username': '',
+ 'Password': '',
+ 'Topic': '',
+ 'UseHttps': False,
+ 'SendEventInterval': 20.0,
+ 'ApiVersion': 5.1,
+ 'KafkaPort': 9092,
+ 'KafkaBroker': 'localhost'
+ }
+
+ def send_data(self, event):
+ """Send event to VES"""
+ server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
+ 's' if self._app_config['UseHttps'] else '',
+ self._app_config['Domain'], int(self._app_config['Port']),
+ '{}'.format('/{}'.format(self._app_config['Path']) if len(
+ self._app_config['Path']) > 0 else ''),
+ int(self._app_config['ApiVersion']), '{}'.format(
+ '/{}'.format(self._app_config['Topic']) if len(
+ self._app_config['Topic']) > 0 else ''))
+ logging.info('Vendor Event Listener is at: {}'.format(server_url))
+ credentials = base64.b64encode('{}:{}'.format(
+ self._app_config['Username'],
+ self._app_config['Password']).encode()).decode()
+ logging.info('Authentication credentials are: {}'.format(credentials))
+ try:
+ request = url.Request(server_url)
+ request.add_header('Authorization', 'Basic {}'.format(credentials))
+ request.add_header('Content-Type', 'application/json')
+ event_str = json.dumps(event).encode()
+ logging.debug("Sending {} to {}".format(event_str, server_url))
+ url.urlopen(request, event_str, timeout=1)
+ logging.debug("Sent data to {} successfully".format(server_url))
+ except url.HTTPError as e:
+ logging.error('Vendor Event Listener exception: {}'.format(e))
+ except url.URLError as e:
+ logging.error(
+ 'Vendor Event Listener is is not reachable: {}'.format(e))
+ except Exception as e:
+ logging.error('Vendor Event Listener error: {}'.format(e))
+
+ def config(self, config):
+ """VES option configuration"""
+ for key, value in config.items('config'):
+ if key in self._app_config:
+ try:
+ if type(self._app_config[key]) == int:
+ value = int(value)
+ elif type(self._app_config[key]) == float:
+ value = float(value)
+ elif type(self._app_config[key]) == bool:
+ value = bool(strtobool(value))
+
+ if isinstance(value, type(self._app_config[key])):
+ self._app_config[key] = value
+ else:
+ logging.error("Type mismatch with %s" % key)
+ sys.exit()
+ except ValueError:
+ logging.error("Incorrect value type for %s" % key)
+ sys.exit()
+ else:
+ logging.error("Incorrect key configuration %s" % key)
+ sys.exit()
+
+ def init(self, configfile, schema_file):
+ if configfile is not None:
+ # read VES configuration file if provided
+ config = ConfigParser.ConfigParser()
+ config.optionxform = lambda option: option
+ config.read(configfile)
+ self.config(config)
+ # initialize normalizer
+ self.initialize(schema_file, self._app_config['SendEventInterval'])
+
+ def run(self):
+ """Consumer JSON data from kafka broker"""
+ kafka_server = '{}:{}'.format(
+ self._app_config.get('KafkaBroker'),
+ self._app_config.get('KafkaPort'))
+ consumer = KafkaConsumer(
+ 'collectd', bootstrap_servers=kafka_server,
+ auto_offset_reset='latest', enable_auto_commit=False,
+ value_deserializer=lambda m: json.loads(m.decode('ascii')))
+
+ for message in consumer:
+ for kafka_data in message.value:
+ # {
+ # u'dstypes': [u'derive'],
+ # u'plugin': u'cpu',
+ # u'dsnames': [u'value'],
+ # u'interval': 10.0,
+ # u'host': u'localhost',
+ # u'values': [99.9978996416267],
+ # u'time': 1502114956.244,
+ # u'plugin_instance': u'44',
+ # u'type_instance': u'idle',
+ # u'type': u'cpu'
+ # }
+ logging.debug('{}:run():data={}'.format(
+ self.__class__.__name__, kafka_data))
+ for ds_name in kafka_data['dsnames']:
+ index = kafka_data['dsnames'].index(ds_name)
+ val_hash = CollectdValue.hash_gen(
+ kafka_data['host'], kafka_data['plugin'],
+ kafka_data['plugin_instance'], kafka_data['type'],
+ kafka_data['type_instance'], ds_name)
+ collector = self.get_collector()
+ val = collector.get(val_hash)
+ if val:
+ # update the value
+ val.value = kafka_data['values'][index]
+ val.time = kafka_data['time']
+ del(val)
+ else:
+ # add new value into the collector
+ val = CollectdValue()
+ val.host = kafka_data['host']
+ val.plugin = kafka_data['plugin']
+ val.plugin_instance = kafka_data['plugin_instance']
+ val.type = kafka_data['type']
+ val.type_instance = kafka_data['type_instance']
+ val.value = kafka_data['values'][index]
+ val.interval = kafka_data['interval']
+ val.time = kafka_data['time']
+ val.ds_name = ds_name
+ collector.add(val)
+
+
+def main():
+ # Parsing cmdline options
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--events-schema", dest="schema", required=True,
+ help="YAML events schema definition", metavar="FILE")
+ parser.add_argument("--config", dest="configfile", default=None,
+ help="Specify config file", metavar="FILE")
+ parser.add_argument("--loglevel", dest="level", default='INFO',
+ choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
+ help="Specify log level (default: %(default)s)",
+ metavar="LEVEL")
+ parser.add_argument("--logfile", dest="logfile", default='ves_app.log',
+ help="Specify log file (default: %(default)s)",
+ metavar="FILE")
+ args = parser.parse_args()
+
+ # Create log file
+ logging.basicConfig(filename=args.logfile,
+ format='%(asctime)s %(message)s',
+ level=args.level)
+ if args.configfile is None:
+ logging.warning("No configfile specified, using default options")
+
+ # Create Application Instance
+ application_instance = VESApp()
+ application_instance.init(args.configfile, args.schema)
+
+ try:
+ # Run the plugin
+ application_instance.run()
+ except KeyboardInterrupt:
+ logging.info(" - Ctrl-C handled, exiting gracefully")
+ except Exception as e:
+ logging.error('{}, {}'.format(type(e), e))
+ finally:
+ application_instance.destroy()
+ sys.exit()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/3rd_party/collectd-ves-app/ves_app/ves_app_config.conf b/3rd_party/collectd-ves-app/ves_app/ves_app_config.conf
new file mode 100644
index 00000000..aee0816c
--- /dev/null
+++ b/3rd_party/collectd-ves-app/ves_app/ves_app_config.conf
@@ -0,0 +1,12 @@
+[config]
+Domain = 127.0.0.1
+Port = 30000
+Path = vendor_event_listener
+Topic = example_vnf
+UseHttps = false
+Username =
+Password =
+SendEventInterval = 20
+ApiVersion = 3
+KafkaPort = 9092
+KafkaBroker = localhost