aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--3rd_party/collectd-ves-app/LICENSE (renamed from 3rd_party/collectd-ves-plugin/LICENSE)18
-rw-r--r--3rd_party/collectd-ves-app/PSF_LICENSE_AGREEMENT (renamed from 3rd_party/collectd-ves-plugin/PSF_LICENSE_AGREEMENT)0
-rw-r--r--3rd_party/collectd-ves-app/ves_app/__init__.py0
-rw-r--r--3rd_party/collectd-ves-app/ves_app/host.yaml214
-rw-r--r--3rd_party/collectd-ves-app/ves_app/normalizer.py598
-rw-r--r--3rd_party/collectd-ves-app/ves_app/ves_app.py214
-rw-r--r--3rd_party/collectd-ves-app/ves_app/ves_app_config.conf (renamed from 3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin_config.conf)3
-rw-r--r--3rd_party/collectd-ves-plugin/ves_plugin/__init__.py12
-rw-r--r--3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py945
-rw-r--r--baro_tests/collectd.py495
-rw-r--r--baro_tests/config_server.py445
-rw-r--r--docs/release/configguide/index.rst2
-rw-r--r--docs/release/release-notes/index.rst2
-rw-r--r--docs/release/scenarios/os-nosdn-bar-ha/index.rst15
-rw-r--r--docs/release/scenarios/os-nosdn-bar-noha/index.rst15
-rw-r--r--docs/release/userguide/feature.userguide.rst75
-rw-r--r--docs/release/userguide/index.rst2
-rw-r--r--src/Makefile2
-rw-r--r--src/collectd-openstack-plugins/Makefile (renamed from src/collectd-ceilometer-plugin/Makefile)4
-rw-r--r--src/package-list.mk2
20 files changed, 1614 insertions, 1449 deletions
diff --git a/3rd_party/collectd-ves-plugin/LICENSE b/3rd_party/collectd-ves-app/LICENSE
index 8b515df5..804ec198 100644
--- a/3rd_party/collectd-ves-plugin/LICENSE
+++ b/3rd_party/collectd-ves-app/LICENSE
@@ -10,10 +10,14 @@ so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-SOFTWARE.
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/3rd_party/collectd-ves-plugin/PSF_LICENSE_AGREEMENT b/3rd_party/collectd-ves-app/PSF_LICENSE_AGREEMENT
index 666648bb..666648bb 100644
--- a/3rd_party/collectd-ves-plugin/PSF_LICENSE_AGREEMENT
+++ b/3rd_party/collectd-ves-app/PSF_LICENSE_AGREEMENT
diff --git a/3rd_party/collectd-ves-app/ves_app/__init__.py b/3rd_party/collectd-ves-app/ves_app/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/3rd_party/collectd-ves-app/ves_app/__init__.py
diff --git a/3rd_party/collectd-ves-app/ves_app/host.yaml b/3rd_party/collectd-ves-app/ves_app/host.yaml
new file mode 100644
index 00000000..a91574cf
--- /dev/null
+++ b/3rd_party/collectd-ves-app/ves_app/host.yaml
@@ -0,0 +1,214 @@
+---
+# Common event header definition (required fields and defaults)
+commonEventHeader: &commonEventHeader
+ domain: N/A
+ eventId: "{system.id}"
+ eventName: ""
+ eventType: Info
+ lastEpochMicrosec: 0
+ priority: Normal
+ reportingEntityId: &reportingEntityId "{system.hostname}"
+ reportingEntityName: *reportingEntityId
+ sequence: 0
+ sourceName: N/A
+ startEpochMicrosec: 0
+ version: 2.0
+
+# Value mapping (used to map collectd notification severity to VES)
+collectdSeverityMapping: &collectdSeverityMapping
+ NOTIF_FAILURE: CRITICAL
+ NOTIF_WARNING: WARNING
+ NOTIF_OKAY: NORMAL
+
+# Measurements definition
+Host Measurements: !Measurements
+ - ITEM-DESC:
+ event:
+ commonEventHeader:
+ <<: *commonEventHeader
+ eventType: hostOS
+ domain: measurementsForVfScaling
+ sourceId: &sourceId "{vl.plugin_instance}"
+ sourceName: *sourceId
+ startEpochMicrosec: !Number "{vl.time}"
+ measurementsForVfScalingFields:
+ measurementsForVfScalingVersion: 2.0
+ additionalFields: !ArrayItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: "/^(?!memory|virt_vcpu|disk_octets|disk_ops|if_packets|if_errors|if_octets|if_dropped).*$/"
+ - ITEM-DESC:
+ name: "{vl.type}-{vl.type_instance}-{vl.ds_name}"
+ value: "{vl.value}"
+ additionalMeasurements: !ArrayItem
+ - SELECT:
+ plugin: "/^(?!virt).*$/"
+ - INDEX-KEY:
+ - plugin
+ - plugin_instance
+ - ITEM-DESC:
+ name: "{vl.plugin}-{vl.plugin_instance}"
+ arrayOfFields: !ArrayItem
+ - SELECT:
+ plugin: "{vl.plugin}"
+ plugin_instance: "{vl.plugin_instance}"
+ - ITEM-DESC:
+ name: "{vl.type}-{vl.type_instance}-{vl.ds_name}"
+ value: "{vl.value}"
+ measurementInterval: !Number "{vl.interval}"
+ memoryUsageArray: !ArrayItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: memory
+ type_instance: total
+ - ITEM-DESC:
+ memoryConfigured: !Bytes2Kibibytes "{vl.value}"
+ vmIdentifier: "{vl.plugin_instance}"
+ memoryUsed: 0.0
+ memoryFree: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: memory
+ type_instance: rss
+ - VALUE: !Bytes2Kibibytes "{vl.value}"
+ - DEFAULT: 0
+ cpuUsageArray: !ArrayItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: virt_vcpu
+ - ITEM-DESC:
+ cpuIdentifier: "{vl.type_instance}"
+ percentUsage: !Number "{vl.value}"
+ vNicPerformanceArray: !ArrayItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: if_packets
+ ds_name: rx
+ - ITEM-DESC:
+ valuesAreSuspect: "true"
+ vNicIdentifier: "{vl.type_instance}"
+ receivedTotalPacketsAccumulated: !Number "{vl.value}"
+ transmittedTotalPacketsAccumulated: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: if_packets
+ type_instance: "{vl.type_instance}"
+ ds_name: tx
+ receivedOctetsAccumulated: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: if_octets
+ type_instance: "{vl.type_instance}"
+ ds_name: rx
+ transmittedOctetsAccumulated: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: if_octets
+ type_instance: "{vl.type_instance}"
+ ds_name: tx
+ receivedErrorPacketsAccumulated: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: if_errors
+ type_instance: "{vl.type_instance}"
+ ds_name: rx
+ transmittedErrorPacketsAccumulated: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: if_errors
+ type_instance: "{vl.type_instance}"
+ ds_name: tx
+ receivedDiscardedPacketsAccumulated: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: if_dropped
+ type_instance: "{vl.type_instance}"
+ ds_name: rx
+ transmittedDiscardedPacketsAccumulated: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: if_dropped
+ type_instance: "{vl.type_instance}"
+ ds_name: tx
+ diskUsageArray: !ArrayItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: disk_octets
+ ds_name: read
+ - ITEM-DESC:
+ diskIdentifier: "{vl.type_instance}"
+ diskOctetsReadLast: !Number "{vl.value}"
+ diskOctetsWriteLast: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: disk_octets
+ type_instance: "{vl.type_instance}"
+ ds_name: write
+ diskOpsReadLast: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: disk_ops
+ type_instance: "{vl.type_instance}"
+ ds_name: read
+ diskOpsWriteLast: !ValueItem
+ - SELECT:
+ plugin: virt
+ plugin_instance: "{vl.plugin_instance}"
+ type: disk_ops
+ type_instance: "{vl.type_instance}"
+ ds_name: write
+ - SELECT:
+ plugin: virt
+ type_instance: virt_cpu_total
+
+Virt Events: !Events
+ - ITEM-DESC:
+ event:
+ commonEventHeader: &event_commonEventHeader
+ <<: *commonEventHeader
+ domain: fault
+ eventType: Notification
+ sourceId: &event_sourceId "{n.plugin_instance}"
+ sourceName: *event_sourceId
+ lastEpochMicrosec: !Number "{n.time}"
+ startEpochMicrosec: !Number "{n.time}"
+ faultFields: &faultFields
+ alarmInterfaceA: "{n.plugin}-{n.plugin_instance}"
+ alarmCondition: "{n.message}"
+ eventSeverity: !MapValue
+ VALUE: "{n.severity}"
+ TO: *collectdSeverityMapping
+ eventSourceType: hypervisor
+ faultFieldsVersion: 1.1
+ specificProblem: "{n.plugin_instance}-{n.type_instance}"
+ vfStatus: Active
+ - CONDITION:
+ plugin: virt
+
+Host Events: !Events
+ - ITEM-DESC:
+ event:
+ commonEventHeader:
+ <<: *event_commonEventHeader
+ sourceId: "{system.hostname}"
+ sourceName: "{system.hostname}"
+ faultFields:
+ <<: *faultFields
+ eventSourceType: host
+ - CONDITION:
+ plugin: "/^(?!virt).*$/"
diff --git a/3rd_party/collectd-ves-app/ves_app/normalizer.py b/3rd_party/collectd-ves-app/ves_app/normalizer.py
new file mode 100644
index 00000000..899c850b
--- /dev/null
+++ b/3rd_party/collectd-ves-app/ves_app/normalizer.py
@@ -0,0 +1,598 @@
+#
+# Copyright(c) 2017 Intel Corporation. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Authors:
+# Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
+#
+
+import yaml
+import logging
+import datetime
+import time
+from threading import RLock
+from threading import Timer
+from threading import Thread
+import re
+
+# import YAML loader
+try:
+ from yaml import CLoader as Loader
+except ImportError:
+ from yaml import Loader
+
+# import synchronized queue
+try:
+ # python 2.x
+ import Queue as queue
+except ImportError:
+ # python 3.x
+ import queue
+
+
+class Config(object):
+ """Configuration class used to pass config option into YAML file"""
+
+ def __init__(self, interval):
+ self.interval = interval
+
+
+class System(object):
+ """System class which provides information like host, time etc., into YAML
+ file"""
+
+ def __init__(self):
+ self.hostname = 'localhost'
+ self._id = 0
+
+ @property
+ def id(self):
+ self._id = self._id + 1
+ return self._id
+
+ @property
+ def time(self):
+ return time.time()
+
+ @property
+ def date(self):
+ return datetime.date.today().isoformat()
+
+
+class ItemIterator(object):
+ """Item iterator returned by Collector class"""
+
+ def __init__(self, collector, items):
+ """Item iterator init"""
+ logging.debug('{}:__init__()'.format(self.__class__.__name__))
+ self._items = items
+ self._collector = collector
+ self._index = 0
+
+ def next(self):
+ """Returns next item from the list"""
+ if self._index == len(self._items):
+ raise StopIteration
+ curr_index = self._index
+ self._index = curr_index + 1
+ return self.items[curr_index]
+
+ def __getitem__(self, key):
+ """get item by index"""
+ return self._items[key]
+
+ def __len__(self):
+ """Return length of elements"""
+ return len(self._items)
+
+ def __del__(self):
+ """Destroy iterator and unlock the collector"""
+ logging.debug('{}:__del__()'.format(self.__class__.__name__))
+ self._collector.unlock()
+
+
+class ItemObject(object):
+ """Item object returned by Collector class"""
+
+ def __init__(self, collector, hash_):
+ """Item object init"""
+ logging.debug('{}:__init__()'.format(self.__class__.__name__))
+ super(ItemObject, self).__setattr__('_collector', collector)
+ super(ItemObject, self).__setattr__('_hash', hash_)
+
+ def __setattr__(self, name, value):
+ t, item = self._collector._metrics[self._hash]
+ logging.debug('{}:__setattr__(name={}, value={})'.format(
+ self.__class__.__name__, name, value))
+ setattr(item, name, value)
+ self._collector._metrics[self._hash] = (time.time(), item)
+
+ def __del__(self):
+ """Destroy item object and unlock the collector"""
+ logging.debug('{}:__del__()'.format(self.__class__.__name__))
+ self._collector.unlock()
+
+
+class Collector(object):
+ """Thread-safe collector with aging feature"""
+
+ def __init__(self, age_timeout):
+ """Initialization"""
+ self._metrics = {}
+ self._lock = RLock()
+ self._age_timeout = age_timeout
+ self._start_age_timer()
+
+ def _start_age_timer(self):
+ """Start age timer"""
+ self._age_timer = Timer(self._age_timeout, self._on_timer)
+ self._age_timer.start()
+
+ def _stop_age_timer(self):
+ """Stop age timer"""
+ self._age_timer.cancel()
+
+ def _on_timer(self):
+ """Age timer"""
+ self._start_age_timer()
+ self._check_aging()
+
+ def _check_aging(self):
+ """Check aging time for all items"""
+ self.lock()
+ for data_hash, data in self._metrics.items():
+ age, item = data
+ if ((time.time() - age) >= self._age_timeout):
+ # aging time has expired, remove the item from the collector
+ logging.debug('{}:_check_aging():value={}'.format(
+ self.__class__.__name__, item))
+ self._metrics.pop(data_hash)
+ del(item)
+ self.unlock()
+
+ def lock(self):
+ """Lock the collector"""
+ logging.debug('{}:lock()'.format(self.__class__.__name__))
+ self._lock.acquire()
+
+ def unlock(self):
+ """Unlock the collector"""
+ logging.debug('{}:unlock()'.format(self.__class__.__name__))
+ self._lock.release()
+
+ def get(self, hash_):
+ self.lock()
+ if hash_ in self._metrics:
+ return ItemObject(self, hash_)
+ self.unlock()
+ return None
+
+ def add(self, item):
+ """Add an item into the collector"""
+ self.lock()
+ logging.debug('{}:add(item={})'.format(self.__class__.__name__, item))
+ self._metrics[hash(item)] = (time.time(), item)
+ self.unlock()
+
+ def items(self, select_list=[]):
+ """Returns locked (safe) item iterator"""
+ metrics = []
+ self.lock()
+ for k, item in self._metrics.items():
+ _, value = item
+ for select in select_list:
+ if value.match(**select):
+ metrics.append(value)
+ return ItemIterator(self, metrics)
+
+ def destroy(self):
+ """Destroy the collector"""
+ self._stop_age_timer()
+
+
+class CollectdData(object):
+ """Base class for Collectd data"""
+
+ def __init__(self, host=None, plugin=None, plugin_instance=None,
+ type_=None, type_instance=None, time_=None):
+ """Class initialization"""
+ self.host = host
+ self.plugin = plugin
+ self.plugin_instance = plugin_instance
+ self.type_instance = type_instance
+ self.type = type_
+ self.time = time_
+
+ @classmethod
+ def is_regular_expression(cls, expr):
+ return True if expr[0] == '/' and expr[-1] == '/' else False
+
+ def match(self, **kargs):
+ # compare the metric
+ for key, value in kargs.items():
+ if self.is_regular_expression(value):
+ if re.match(value[1:-1], getattr(self, key)) is None:
+ return False
+ elif value != getattr(self, key):
+ return False
+ # return match event if kargs is empty
+ return True
+
+
+class CollectdNotification(CollectdData):
+ """Collectd notification"""
+
+ def __init__(self, host=None, plugin=None, plugin_instance=None,
+ type_=None, type_instance=None, severity=None, message=None):
+ super(CollectdNotification, self).__init__(
+ host, plugin, plugin_instance, type_, type_instance)
+ self.severity = severity
+ self.message = message
+
+ def __repr__(self):
+ return '{}(host={}, plugin={}, plugin_instance={}, type={},' \
+ 'type_instance={}, severity={}, message={}, time={})'.format(
+ self.__class__.__name__, self.host, self.plugin,
+ self.plugin_instance, self.type, self.type_instance,
+ self.severity, self.message, time)
+
+
+class CollectdValue(CollectdData):
+ """Collectd value"""
+
+ def __init__(self, host=None, plugin=None, plugin_instance=None,
+ type_=None, type_instance=None, ds_name='value', value=None,
+ interval=None):
+ super(CollectdValue, self).__init__(
+ host, plugin, plugin_instance, type_, type_instance)
+ self.value = value
+ self.ds_name = ds_name
+ self.interval = interval
+
+ @classmethod
+ def hash_gen(cls, host, plugin, plugin_instance, type_,
+ type_instance, ds_name):
+ return hash((host, plugin, plugin_instance, type_,
+ type_instance, ds_name))
+
+ def __eq__(self, other):
+ return hash(self) == hash(other) and self.value == other.value
+
+ def __hash__(self):
+ return self.hash_gen(self.host, self.plugin, self.plugin_instance,
+ self.type, self.type_instance, self.ds_name)
+
+ def __repr__(self):
+ return '{}(host={}, plugin={}, plugin_instance={}, type={},' \
+ 'type_instance={}, ds_name={}, value={}, time={})'.format(
+ self.__class__.__name__, self.host, self.plugin,
+ self.plugin_instance, self.type, self.type_instance,
+ self.ds_name, self.value, self.time)
+
+
+class Item(yaml.YAMLObject):
+ """Base class to process tags like ArrayItem/ValueItem"""
+
+ @classmethod
+ def format_node(cls, mapping, metric):
+ if mapping.tag in [
+ 'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag,
+ Number.yaml_tag]:
+ return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric))
+ elif mapping.tag == 'tag:yaml.org,2002:map':
+ values = []
+ for key, value in mapping.value:
+ values.append((yaml.ScalarNode(key.tag, key.value),
+ cls.format_node(value, metric)))
+ return yaml.MappingNode(mapping.tag, values)
+ elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]:
+ values = []
+ for seq in mapping.value:
+ map_values = list()
+ for key, value in seq.value:
+ if key.value == 'SELECT':
+ map_values.append((yaml.ScalarNode(key.tag, key.value),
+ cls.format_node(value, metric)))
+ else:
+ map_values.append((yaml.ScalarNode(key.tag, key.value),
+ value))
+ values.append(yaml.MappingNode(seq.tag, map_values))
+ return yaml.SequenceNode(mapping.tag, values)
+ elif mapping.tag in [MapValue.yaml_tag]:
+ values = []
+ for key, value in mapping.value:
+ if key.value == 'VALUE':
+ values.append((yaml.ScalarNode(key.tag, key.value),
+ cls.format_node(value, metric)))
+ else:
+ values.append((yaml.ScalarNode(key.tag, key.value), value))
+ return yaml.MappingNode(mapping.tag, values)
+ return mapping
+
+
+class ValueItem(Item):
+ """Class to process VlaueItem tag"""
+ yaml_tag = u'!ValueItem'
+
+ @classmethod
+ def from_yaml(cls, loader, node):
+ logging.debug('{}:from_yaml(loader={})'.format(cls.__name__, loader))
+ default, select, value_desc = None, list(), None
+ # find value description
+ for elem in node.value:
+ for key, value in elem.value:
+ if key.value == 'VALUE':
+ assert value_desc is None, "VALUE key already set"
+ value_desc = value
+ if key.value == 'SELECT':
+ select.append(loader.construct_mapping(value))
+ if key.value == 'DEFAULT':
+ assert default is None, "DEFAULT key already set"
+ default = loader.construct_object(value)
+ # if VALUE key isn't given, use default VALUE key
+ # format: `VALUE: !Number '{vl.value}'`
+ if value_desc is None:
+ value_desc = yaml.ScalarNode(tag=u'!Number', value=u'{vl.value}')
+ # select collectd metric based on SELECT condition
+ metrics = loader.collector.items(select)
+ assert len(metrics) < 2, \
+ 'Wrong SELECT condition, selected {} metrics'.format(len(metrics))
+ if len(metrics) > 0:
+ item = cls.format_node(value_desc, {'vl': metrics[0],
+ 'system': loader.system})
+ return loader.construct_object(item)
+ # nothing has been found by SELECT condition, set to DEFAULT value.
+ assert default is not None, \
+ "No metrics selected by SELECT condition and DEFAULT key isn't set"
+ return default
+
+
+class ArrayItem(Item):
+ """Class to process ArrayItem tag"""
+ yaml_tag = u'!ArrayItem'
+
+ @classmethod
+ def from_yaml(cls, loader, node):
+ logging.debug('{}:process(loader={}, node={})'.format(cls.__name__,
+ loader, node))
+ # e.g.:
+ # SequenceNode(tag=u'!ArrayItem', value=[
+ # MappingNode(tag=u'tag:yaml.org,2002:map', value=[
+ # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'),
+ # MappingNode(tag=u'tag:yaml.org,2002:map', value=[
+ # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'),
+ # , ...)
+ # ]), ...
+ # ), (key, value), ... ])
+ # , ... ])
+ assert isinstance(node, yaml.SequenceNode), \
+ "{} tag isn't YAML array".format(cls.__name__)
+ select, index_keys, items, item_desc = list(), list(), list(), None
+ for elem in node.value:
+ for key, value in elem.value:
+ if key.value == 'ITEM-DESC':
+ assert item_desc is None, "ITEM-DESC key already set"
+ item_desc = value
+ if key.value == 'INDEX-KEY':
+ assert len(index_keys) == 0, "INDEX-KEY key already set"
+ index_keys = loader.construct_sequence(value)
+ if key.value == 'SELECT':
+ select.append(loader.construct_mapping(value))
+ # validate item description
+ assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
+ assert len(select) > 0 or len(index_keys) > 0, \
+ "Mandatory key (INDEX-KEY or SELECT) isn't set"
+ metrics = loader.collector.items(select)
+ # select metrics based on INDEX-KEY provided
+ if len(index_keys) > 0:
+ metric_set = set()
+ for metric in metrics:
+ value_params = {}
+ for key in index_keys:
+ value_params[key] = getattr(metric, key)
+ metric_set.add(CollectdValue(**value_params))
+ metrics = list(metric_set)
+ # build items based on SELECT and/or INDEX-KEY criteria
+ for metric in metrics:
+ item = cls.format_node(item_desc,
+ {'vl': metric, 'system': loader.system,
+ 'config': loader.config})
+ items.append(loader.construct_mapping(item))
+ return items
+
+
+class Measurements(ArrayItem):
+ """Class to process Measurements tag"""
+ yaml_tag = u'!Measurements'
+
+
+class Events(Item):
+ """Class to process Events tag"""
+ yaml_tag = u'!Events'
+
+ @classmethod
+ def from_yaml(cls, loader, node):
+ condition, item_desc = dict(), None
+ for elem in node.value:
+ for key, value in elem.value:
+ if key.value == 'ITEM-DESC':
+ item_desc = value
+ if key.value == 'CONDITION':
+ condition = loader.construct_mapping(value)
+ assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
+ if loader.notification.match(**condition):
+ item = cls.format_node(item_desc, {
+ 'n': loader.notification, 'system': loader.system})
+ return loader.construct_mapping(item)
+ return None
+
+
+class Bytes2Kibibytes(yaml.YAMLObject):
+ """Class to process Bytes2Kibibytes tag"""
+ yaml_tag = u'!Bytes2Kibibytes'
+
+ @classmethod
+ def from_yaml(cls, loader, node):
+ return round(float(node.value) / 1024.0, 3)
+
+
+class Number(yaml.YAMLObject):
+ """Class to process Number tag"""
+ yaml_tag = u'!Number'
+
+ @classmethod
+ def from_yaml(cls, loader, node):
+ try:
+ return int(node.value)
+ except ValueError:
+ return float(node.value)
+
+
+class MapValue(yaml.YAMLObject):
+ """Class to process MapValue tag"""
+ yaml_tag = u'!MapValue'
+
+ @classmethod
+ def from_yaml(cls, loader, node):
+ mapping, val = None, None
+ for key, value in node.value:
+ if key.value == 'TO':
+ mapping = loader.construct_mapping(value)
+ if key.value == 'VALUE':
+ val = loader.construct_object(value)
+ assert mapping is not None, "Mandatory TO key isn't set"
+ assert val is not None, "Mandatory VALUE key isn't set"
+ assert val in mapping, \
+ 'Value "{}" cannot be mapped to any of {} values'.format(
+ val, mapping.keys())
+ return mapping[val]
+
+
+class Normalizer(object):
+ """Normalization class which handles events and measurements"""
+
+ def __init__(self):
+ """Init"""
+ self.interval = None
+ self.collector = None
+ self.system = None
+ self.queue = None
+ self.timer = None
+
+ @classmethod
+ def read_configuration(cls, config_file):
+ """read YAML configuration file"""
+ # load YAML events/measurements definition
+ f = open(config_file, 'r')
+ doc_yaml = yaml.compose(f)
+ f.close()
+ # split events & measurements definitions
+ measurements, events = list(), list()
+ for key, value in doc_yaml.value:
+ if value.tag == Measurements.yaml_tag:
+ measurements.append((key, value))
+ if value.tag == Events.yaml_tag:
+ events.append((key, value))
+ measurements_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map',
+ measurements)
+ measurements_stream = yaml.serialize(measurements_yaml)
+ events_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map', events)
+ events_stream = yaml.serialize(events_yaml)
+ # return event & measurements definition
+ return events_stream, measurements_stream
+
+ def initialize(self, config_file, interval):
+ """Initialize the class"""
+ e, m = self.read_configuration(config_file)
+ self.measurements_stream = m
+ self.events_stream = e
+ self.system = System()
+ self.config = Config(interval)
+ self.interval = interval
+ # start collector with aging time = double interval
+ self.collector = Collector(interval * 2)
+ # initialize event thread
+ self.queue = queue.Queue()
+ self.event_thread = Thread(target=self.event_worker)
+ self.event_thread.daemon = True
+ self.event_thread.start()
+ # initialize measurements timer
+ self.start_timer()
+
+ def destroy(self):
+ """Destroy the class"""
+ self.collector.destroy()
+ self.post_event(None) # send stop event
+ self.event_thread.join()
+ self.stop_timer()
+
+ def start_timer(self):
+ """Start measurements timer"""
+ self.timer = Timer(self.interval, self.on_timer)
+ self.timer.start()
+
+ def stop_timer(self):
+ """Stop measurements timer"""
+ self.timer.cancel()
+
+ def on_timer(self):
+ """Measurements timer"""
+ self.start_timer()
+ self.process_measurements()
+
+ def event_worker(self):
+ """Event worker"""
+ while True:
+ event = self.queue.get()
+ if isinstance(event, CollectdNotification):
+ self.process_notify(event)
+ continue
+ # exit for the worker
+ break
+
+ def get_collector(self):
+ """Get metric collector reference"""
+ return self.collector
+
+ def process_measurements(self):
+ """Process measurements"""
+ loader = Loader(self.measurements_stream)
+ setattr(loader, 'collector', self.collector)
+ setattr(loader, 'system', self.system)
+ setattr(loader, 'config', self.config)
+ measurements = loader.get_data()
+ for measurement_name in measurements:
+ logging.debug('Process "{}" measurements: {}'.format(
+ measurement_name, measurements[measurement_name]))
+ for measurement in measurements[measurement_name]:
+ self.send_data(measurement)
+
+ def process_notify(self, notification):
+ """Process events"""
+ loader = Loader(self.events_stream)
+ setattr(loader, 'notification', notification)
+ setattr(loader, 'system', self.system)
+ notifications = loader.get_data()
+ for notify_name in notifications:
+ logging.debug('Process "{}" notification'.format(notify_name))
+ if notifications[notify_name] is not None:
+ self.send_data(notifications[notify_name])
+
+ def send_data(self, data):
+ """Send data"""
+ assert False, 'send_data() is abstract function and MUST be overridden'
+
+ def post_event(self, notification):
+ """Post notification into the queue to process"""
+ self.queue.put(notification)
diff --git a/3rd_party/collectd-ves-app/ves_app/ves_app.py b/3rd_party/collectd-ves-app/ves_app/ves_app.py
new file mode 100644
index 00000000..105c66e2
--- /dev/null
+++ b/3rd_party/collectd-ves-app/ves_app/ves_app.py
@@ -0,0 +1,214 @@
+#!/usr/bin/env python
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import sys
+import base64
+import ConfigParser
+import logging
+import argparse
+
+from distutils.util import strtobool
+from kafka import KafkaConsumer
+
+from normalizer import Normalizer
+from normalizer import CollectdValue
+
+try:
+ # For Python 3.0 and later
+ import urllib.request as url
+except ImportError:
+ # Fall back to Python 2's urllib2
+ import urllib2 as url
+
+
+class VESApp(Normalizer):
+ """VES Application"""
+
+ def __init__(self):
+ """Application initialization"""
+ self._app_config = {
+ 'Domain': '127.0.0.1',
+ 'Port': 30000,
+ 'Path': '',
+ 'Username': '',
+ 'Password': '',
+ 'Topic': '',
+ 'UseHttps': False,
+ 'SendEventInterval': 20.0,
+ 'ApiVersion': 5.1,
+ 'KafkaPort': 9092,
+ 'KafkaBroker': 'localhost'
+ }
+
+ def send_data(self, event):
+ """Send event to VES"""
+ server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
+ 's' if self._app_config['UseHttps'] else '',
+ self._app_config['Domain'], int(self._app_config['Port']),
+ '{}'.format('/{}'.format(self._app_config['Path']) if len(
+ self._app_config['Path']) > 0 else ''),
+ int(self._app_config['ApiVersion']), '{}'.format(
+ '/{}'.format(self._app_config['Topic']) if len(
+ self._app_config['Topic']) > 0 else ''))
+ logging.info('Vendor Event Listener is at: {}'.format(server_url))
+ credentials = base64.b64encode('{}:{}'.format(
+ self._app_config['Username'],
+ self._app_config['Password']).encode()).decode()
+ logging.info('Authentication credentials are: {}'.format(credentials))
+ try:
+ request = url.Request(server_url)
+ request.add_header('Authorization', 'Basic {}'.format(credentials))
+ request.add_header('Content-Type', 'application/json')
+ event_str = json.dumps(event).encode()
+ logging.debug("Sending {} to {}".format(event_str, server_url))
+ url.urlopen(request, event_str, timeout=1)
+ logging.debug("Sent data to {} successfully".format(server_url))
+ except url.HTTPError as e:
+ logging.error('Vendor Event Listener exception: {}'.format(e))
+ except url.URLError as e:
+ logging.error(
+ 'Vendor Event Listener is is not reachable: {}'.format(e))
+ except Exception as e:
+ logging.error('Vendor Event Listener error: {}'.format(e))
+
+ def config(self, config):
+ """VES option configuration"""
+ for key, value in config.items('config'):
+ if key in self._app_config:
+ try:
+ if type(self._app_config[key]) == int:
+ value = int(value)
+ elif type(self._app_config[key]) == float:
+ value = float(value)
+ elif type(self._app_config[key]) == bool:
+ value = bool(strtobool(value))
+
+ if isinstance(value, type(self._app_config[key])):
+ self._app_config[key] = value
+ else:
+ logging.error("Type mismatch with %s" % key)
+ sys.exit()
+ except ValueError:
+ logging.error("Incorrect value type for %s" % key)
+ sys.exit()
+ else:
+ logging.error("Incorrect key configuration %s" % key)
+ sys.exit()
+
+ def init(self, configfile, schema_file):
+ if configfile is not None:
+ # read VES configuration file if provided
+ config = ConfigParser.ConfigParser()
+ config.optionxform = lambda option: option
+ config.read(configfile)
+ self.config(config)
+ # initialize normalizer
+ self.initialize(schema_file, self._app_config['SendEventInterval'])
+
+ def run(self):
+ """Consumer JSON data from kafka broker"""
+ kafka_server = '{}:{}'.format(
+ self._app_config.get('KafkaBroker'),
+ self._app_config.get('KafkaPort'))
+ consumer = KafkaConsumer(
+ 'collectd', bootstrap_servers=kafka_server,
+ auto_offset_reset='latest', enable_auto_commit=False,
+ value_deserializer=lambda m: json.loads(m.decode('ascii')))
+
+ for message in consumer:
+ for kafka_data in message.value:
+ # {
+ # u'dstypes': [u'derive'],
+ # u'plugin': u'cpu',
+ # u'dsnames': [u'value'],
+ # u'interval': 10.0,
+ # u'host': u'localhost',
+ # u'values': [99.9978996416267],
+ # u'time': 1502114956.244,
+ # u'plugin_instance': u'44',
+ # u'type_instance': u'idle',
+ # u'type': u'cpu'
+ # }
+ logging.debug('{}:run():data={}'.format(
+ self.__class__.__name__, kafka_data))
+ for ds_name in kafka_data['dsnames']:
+ index = kafka_data['dsnames'].index(ds_name)
+ val_hash = CollectdValue.hash_gen(
+ kafka_data['host'], kafka_data['plugin'],
+ kafka_data['plugin_instance'], kafka_data['type'],
+ kafka_data['type_instance'], ds_name)
+ collector = self.get_collector()
+ val = collector.get(val_hash)
+ if val:
+ # update the value
+ val.value = kafka_data['values'][index]
+ val.time = kafka_data['time']
+ del(val)
+ else:
+ # add new value into the collector
+ val = CollectdValue()
+ val.host = kafka_data['host']
+ val.plugin = kafka_data['plugin']
+ val.plugin_instance = kafka_data['plugin_instance']
+ val.type = kafka_data['type']
+ val.type_instance = kafka_data['type_instance']
+ val.value = kafka_data['values'][index]
+ val.interval = kafka_data['interval']
+ val.time = kafka_data['time']
+ val.ds_name = ds_name
+ collector.add(val)
+
+
+def main():
+ # Parsing cmdline options
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--events-schema", dest="schema", required=True,
+ help="YAML events schema definition", metavar="FILE")
+ parser.add_argument("--config", dest="configfile", default=None,
+ help="Specify config file", metavar="FILE")
+ parser.add_argument("--loglevel", dest="level", default='INFO',
+ choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
+ help="Specify log level (default: %(default)s)",
+ metavar="LEVEL")
+ parser.add_argument("--logfile", dest="logfile", default='ves_app.log',
+ help="Specify log file (default: %(default)s)",
+ metavar="FILE")
+ args = parser.parse_args()
+
+ # Create log file
+ logging.basicConfig(filename=args.logfile,
+ format='%(asctime)s %(message)s',
+ level=args.level)
+ if args.configfile is None:
+ logging.warning("No configfile specified, using default options")
+
+ # Create Application Instance
+ application_instance = VESApp()
+ application_instance.init(args.configfile, args.schema)
+
+ try:
+ # Run the plugin
+ application_instance.run()
+ except KeyboardInterrupt:
+ logging.info(" - Ctrl-C handled, exiting gracefully")
+ except Exception as e:
+ logging.error('{}, {}'.format(type(e), e))
+ finally:
+ application_instance.destroy()
+ sys.exit()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin_config.conf b/3rd_party/collectd-ves-app/ves_app/ves_app_config.conf
index 1dccd49b..aee0816c 100644
--- a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin_config.conf
+++ b/3rd_party/collectd-ves-app/ves_app/ves_app_config.conf
@@ -6,8 +6,7 @@ Topic = example_vnf
UseHttps = false
Username =
Password =
-FunctionalRole = Collectd VES Agent
SendEventInterval = 20
ApiVersion = 3
KafkaPort = 9092
-KafkaBroker = localhost \ No newline at end of file
+KafkaBroker = localhost
diff --git a/3rd_party/collectd-ves-plugin/ves_plugin/__init__.py b/3rd_party/collectd-ves-plugin/ves_plugin/__init__.py
deleted file mode 100644
index 14364a35..00000000
--- a/3rd_party/collectd-ves-plugin/ves_plugin/__init__.py
+++ /dev/null
@@ -1,12 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py b/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py
deleted file mode 100644
index 4c313cee..00000000
--- a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py
+++ /dev/null
@@ -1,945 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import json
-import sys
-import base64
-import ConfigParser
-import logging
-import argparse
-
-from distutils.util import strtobool
-from kafka import KafkaConsumer
-
-try:
- # For Python 3.0 and later
- import urllib.request as url
-except ImportError:
- # Fall back to Python 2's urllib2
- import urllib2 as url
-import socket
-import time
-from threading import Timer
-from threading import Lock
-
-
-class Event(object):
- """Event header"""
-
- def __init__(self):
- """Construct the common header"""
- self.version = 2.0
- self.event_type = "Info" # use "Info" unless a notification is generated
- self.domain = ""
- self.event_id = ""
- self.source_id = ""
- self.source_name = ""
- self.functional_role = ""
- self.reporting_entity_id = ""
- self.reporting_entity_name = ""
- self.priority = "Normal" # will be derived from event if there is one
- self.start_epoch_microsec = 0
- self.last_epoch_micro_sec = 0
- self.sequence = 0
- self.event_name = ""
- self.internal_header_fields = {}
- self.nfc_naming_code = ""
- self.nf_naming_code = ""
-
- def get_json(self):
- """Get the object of the datatype"""
- obj = {}
- obj['version'] = self.version
- obj['eventType'] = self.event_type
- obj['domain'] = self.domain
- obj['eventId'] = self.event_id
- obj['sourceId'] = self.source_id
- obj['sourceName'] = self.source_name
- obj['reportingEntityId'] = self.reporting_entity_id
- obj['reportingEntityName'] = self.reporting_entity_name
- obj['priority'] = self.priority
- obj['startEpochMicrosec'] = self.start_epoch_microsec
- obj['lastEpochMicrosec'] = self.last_epoch_micro_sec
- obj['sequence'] = self.sequence
- obj['eventName'] = self.event_name
- obj['internalHeaderFields'] = self.internal_header_fields
- obj['nfcNamingCode'] = self.nfc_naming_code
- obj['nfNamingCode'] = self.nf_naming_code
- return json.dumps({
- 'event': {
- 'commonEventHeader': obj,
- self.get_name(): self.get_obj()
- }
- }).encode()
-
- def get_name():
- assert False, 'abstract method get_name() is not implemented'
-
- def get_obj():
- assert False, 'abstract method get_obj() is not implemented'
-
-
-class Field(object):
- """field datatype"""
-
- def __init__(self, name, value):
- self.name = name
- self.value = value
-
- def get_obj(self):
- return {
- 'name': self.name,
- 'value': self.value
- }
-
-
-class NamedArrayOfFields(object):
- """namedArrayOfFields datatype"""
-
- def __init__(self, name):
- self.name = name
- self.array_of_fields = []
-
- def add(self, field):
- self.array_of_fields.append(field.get_obj())
-
- def get_obj(self):
- return {
- 'name': self.name,
- 'arrayOfFields': self.array_of_fields
- }
-
-
-class VESDataType(object):
- """ Base VES datatype """
-
- def set_optional(self, obj, key, val):
- if val is not None:
- obj[key] = val
-
-
-class DiskUsage(VESDataType):
- """diskUsage datatype"""
-
- def __init__(self, identifier):
- self.disk_identifier = identifier
- self.disk_io_time_avg = None
- self.disk_io_time_last = None
- self.disk_io_time_max = None
- self.disk_io_time_min = None
- self.disk_merged_read_avg = None
- self.disk_merged_read_last = None
- self.disk_merged_read_max = None
- self.disk_merged_read_min = None
- self.disk_merged_write_avg = None
- self.disk_merged_write_last = None
- self.disk_merged_write_max = None
- self.disk_merged_write_min = None
- self.disk_octets_read_avg = None
- self.disk_octets_read_last = None
- self.disk_octets_read_max = None
- self.disk_octets_read_min = None
- self.disk_octets_write_avg = None
- self.disk_octets_write_last = None
- self.disk_octets_write_max = None
- self.disk_octets_write_min = None
- self.disk_ops_read_avg = None
- self.disk_ops_read_last = None
- self.disk_ops_read_max = None
- self.disk_ops_read_min = None
- self.disk_ops_write_avg = None
- self.disk_ops_write_last = None
- self.disk_ops_write_max = None
- self.disk_ops_write_min = None
- self.disk_pending_operations_avg = None
- self.disk_pending_operations_last = None
- self.disk_pending_operations_max = None
- self.disk_pending_operations_min = None
- self.disk_time_read_avg = None
- self.disk_time_read_last = None
- self.disk_time_read_max = None
- self.disk_time_read_min = None
- self.disk_time_write_avg = None
- self.disk_time_write_last = None
- self.disk_time_write_max = None
- self.disk_time_write_min = None
-
- def get_obj(self):
- obj = {
- # required
- 'diskIdentifier': self.disk_identifier
- }
- self.set_optional(obj, 'diskIoTimeAvg', self.disk_io_time_avg)
- self.set_optional(obj, 'diskIoTimeLast', self.disk_io_time_last)
- self.set_optional(obj, 'diskIoTimeMax', self.disk_io_time_max)
- self.set_optional(obj, 'diskIoTimeMin', self.disk_io_time_min)
- self.set_optional(obj, 'diskMergedReadAvg', self.disk_merged_read_avg)
- self.set_optional(obj, 'diskMergedReadLast', self.disk_merged_read_last)
- self.set_optional(obj, 'diskMergedReadMax', self.disk_merged_read_max)
- self.set_optional(obj, 'diskMergedReadMin', self.disk_merged_read_min)
- self.set_optional(obj, 'diskMergedWriteAvg', self.disk_merged_write_avg)
- self.set_optional(obj, 'diskMergedWriteLast', self.disk_merged_write_last)
- self.set_optional(obj, 'diskMergedWriteMax', self.disk_merged_write_max)
- self.set_optional(obj, 'diskMergedWriteMin', self.disk_merged_write_min)
- self.set_optional(obj, 'diskOctetsReadAvg', self.disk_octets_read_avg)
- self.set_optional(obj, 'diskOctetsReadLast', self.disk_octets_read_last)
- self.set_optional(obj, 'diskOctetsReadMax', self.disk_octets_read_max)
- self.set_optional(obj, 'diskOctetsReadMin', self.disk_octets_read_min)
- self.set_optional(obj, 'diskOctetsWriteAvg', self.disk_octets_write_avg)
- self.set_optional(obj, 'diskOctetsWriteLast', self.disk_octets_write_last)
- self.set_optional(obj, 'diskOctetsWriteMax', self.disk_octets_write_max)
- self.set_optional(obj, 'diskOctetsWriteMin', self.disk_octets_write_min)
- self.set_optional(obj, 'diskOpsReadAvg', self.disk_ops_read_avg)
- self.set_optional(obj, 'diskOpsReadLast', self.disk_ops_read_last)
- self.set_optional(obj, 'diskOpsReadMax', self.disk_ops_read_max)
- self.set_optional(obj, 'diskOpsReadMin', self.disk_ops_read_min)
- self.set_optional(obj, 'diskOpsWriteAvg', self.disk_ops_write_avg)
- self.set_optional(obj, 'diskOpsWriteLast', self.disk_ops_write_last)
- self.set_optional(obj, 'diskOpsWriteMax', self.disk_ops_write_max)
- self.set_optional(obj, 'diskOpsWriteMin', self.disk_ops_write_min)
- self.set_optional(obj, 'diskPendingOperationsAvg', self.disk_pending_operations_avg)
- self.set_optional(obj, 'diskPendingOperationsLast', self.disk_pending_operations_last)
- self.set_optional(obj, 'diskPendingOperationsMax', self.disk_pending_operations_max)
- self.set_optional(obj, 'diskPendingOperationsMin', self.disk_pending_operations_min)
- self.set_optional(obj, 'diskTimeReadAvg', self.disk_time_read_avg)
- self.set_optional(obj, 'diskTimeReadLast', self.disk_time_read_last)
- self.set_optional(obj, 'diskTimeReadMax', self.disk_time_read_max)
- self.set_optional(obj, 'diskTimeReadMin', self.disk_time_read_min)
- self.set_optional(obj, 'diskTimeWriteAvg', self.disk_time_write_avg)
- self.set_optional(obj, 'diskTimeWriteLast', self.disk_time_write_last)
- self.set_optional(obj, 'diskTimeWriteMax', self.disk_time_write_max)
- self.set_optional(obj, 'diskTimeWriteMin', self.disk_time_write_min)
- return obj
-
-
-class VNicPerformance(VESDataType):
- """vNicPerformance datatype"""
-
- def __init__(self, identifier):
- self.received_broadcast_packets_accumulated = None
- self.received_broadcast_packets_delta = None
- self.received_discarded_packets_accumulated = None
- self.received_discarded_packets_delta = None
- self.received_error_packets_accumulated = None
- self.received_error_packets_delta = None
- self.received_multicast_packets_accumulated = None
- self.received_multicast_packets_delta = None
- self.received_octets_accumulated = None
- self.received_octets_delta = None
- self.received_total_packets_accumulated = None
- self.received_total_packets_delta = None
- self.received_unicast_packets_accumulated = None
- self.received_unicast_packets_delta = None
- self.transmitted_broadcast_packets_accumulated = None
- self.transmitted_broadcast_packets_delta = None
- self.transmitted_discarded_packets_accumulated = None
- self.transmitted_discarded_packets_delta = None
- self.transmitted_error_packets_accumulated = None
- self.transmitted_error_packets_delta = None
- self.transmitted_multicast_packets_accumulated = None
- self.transmitted_multicast_packets_delta = None
- self.transmitted_octets_accumulated = None
- self.transmitted_octets_delta = None
- self.transmitted_total_packets_accumulated = None
- self.transmitted_total_packets_delta = None
- self.transmitted_unicast_packets_accumulated = None
- self.transmitted_unicast_packets_delta = None
- self.values_are_suspect = 'true'
- self.v_nic_identifier = identifier
-
- def get_obj(self):
- obj = {
- # required
- 'valuesAreSuspect': self.values_are_suspect,
- 'vNicIdentifier': self.v_nic_identifier
- }
- # optional
- self.set_optional(obj, 'receivedBroadcastPacketsAccumulated', self.received_broadcast_packets_accumulated)
- self.set_optional(obj, 'receivedBroadcastPacketsDelta', self.received_broadcast_packets_delta)
- self.set_optional(obj, 'receivedDiscardedPacketsAccumulated', self.received_discarded_packets_accumulated)
- self.set_optional(obj, 'receivedDiscardedPacketsDelta', self.received_discarded_packets_delta)
- self.set_optional(obj, 'receivedErrorPacketsAccumulated', self.received_error_packets_accumulated)
- self.set_optional(obj, 'receivedErrorPacketsDelta', self.received_error_packets_delta)
- self.set_optional(obj, 'receivedMulticastPacketsAccumulated', self.received_multicast_packets_accumulated)
- self.set_optional(obj, 'receivedMulticastPacketsDelta', self.received_multicast_packets_delta)
- self.set_optional(obj, 'receivedOctetsAccumulated', self.received_octets_accumulated)
- self.set_optional(obj, 'receivedOctetsDelta', self.received_octets_delta)
- self.set_optional(obj, 'receivedTotalPacketsAccumulated', self.received_total_packets_accumulated)
- self.set_optional(obj, 'receivedTotalPacketsDelta', self.received_total_packets_delta)
- self.set_optional(obj, 'receivedUnicastPacketsAccumulated', self.received_unicast_packets_accumulated)
- self.set_optional(obj, 'receivedUnicastPacketsDelta', self.received_unicast_packets_delta)
- self.set_optional(obj, 'transmittedBroadcastPacketsAccumulated', self.transmitted_broadcast_packets_accumulated)
- self.set_optional(obj, 'transmittedBroadcastPacketsDelta', self.transmitted_broadcast_packets_delta)
- self.set_optional(obj, 'transmittedDiscardedPacketsAccumulated', self.transmitted_discarded_packets_accumulated)
- self.set_optional(obj, 'transmittedDiscardedPacketsDelta', self.transmitted_discarded_packets_delta)
- self.set_optional(obj, 'transmittedErrorPacketsAccumulated', self.transmitted_error_packets_accumulated)
- self.set_optional(obj, 'transmittedErrorPacketsDelta', self.transmitted_error_packets_delta)
- self.set_optional(obj, 'transmittedMulticastPacketsAccumulated', self.transmitted_multicast_packets_accumulated)
- self.set_optional(obj, 'transmittedMulticastPacketsDelta', self.transmitted_multicast_packets_delta)
- self.set_optional(obj, 'transmittedOctetsAccumulated', self.transmitted_octets_accumulated)
- self.set_optional(obj, 'transmittedOctetsDelta', self.transmitted_octets_delta)
- self.set_optional(obj, 'transmittedTotalPacketsAccumulated', self.transmitted_total_packets_accumulated)
- self.set_optional(obj, 'transmittedTotalPacketsDelta', self.transmitted_total_packets_delta)
- self.set_optional(obj, 'transmittedUnicastPacketsAccumulated', self.transmitted_unicast_packets_accumulated)
- self.set_optional(obj, 'transmittedUnicastPacketsDelta', self.transmitted_unicast_packets_delta)
- return obj
-
-
-class CpuUsage(VESDataType):
- """cpuUsage datatype"""
-
- def __init__(self, identifier):
- self.cpu_identifier = identifier
- self.cpu_idle = None
- self.cpu_usage_interrupt = None
- self.cpu_usage_nice = None
- self.cpu_usage_soft_irq = None
- self.cpu_usage_steal = None
- self.cpu_usage_system = None
- self.cpu_usage_user = None
- self.cpu_wait = None
- self.percent_usage = 0
-
- def get_obj(self):
- obj = {
- # required
- 'cpuIdentifier': self.cpu_identifier,
- 'percentUsage': self.percent_usage
- }
- # optional
- self.set_optional(obj, 'cpuIdle', self.cpu_idle)
- self.set_optional(obj, 'cpuUsageInterrupt', self.cpu_usage_interrupt)
- self.set_optional(obj, 'cpuUsageNice', self.cpu_usage_nice)
- self.set_optional(obj, 'cpuUsageSoftIrq', self.cpu_usage_soft_irq)
- self.set_optional(obj, 'cpuUsageSteal', self.cpu_usage_steal)
- self.set_optional(obj, 'cpuUsageSystem', self.cpu_usage_system)
- self.set_optional(obj, 'cpuUsageUser', self.cpu_usage_user)
- self.set_optional(obj, 'cpuWait', self.cpu_wait)
- return obj
-
-
-class MemoryUsage(VESDataType):
- """memoryUsage datatype"""
-
- def __init__(self, identifier):
- self.memory_buffered = None
- self.memory_cached = None
- self.memory_configured = None
- self.memory_free = None
- self.memory_slab_recl = None
- self.memory_slab_unrecl = None
- self.memory_used = None
- self.vm_identifier = identifier
-
- def __str__(self):
- """ for debug purposes """
- return 'vm_identifier : {vm_identifier}\nbuffered : {buffered}\n' \
- 'cached : {cached}\nconfigured : {configured}\nfree : {free}\n' \
- 'slab_recl : {slab_recl}\nslab_unrecl : {slab_unrecl}\n' \
- 'used : {used}\n'.format(buffered=self.memory_buffered,
- cached=self.memory_cached, configured=self.memory_configured,
- free=self.memory_free, slab_recl=self.memory_slab_recl,
- slab_unrecl=self.memory_slab_unrecl, used=self.memory_used,
- vm_identifier=self.vm_identifier)
-
- def get_memory_free(self):
- if self.memory_free is None:
- # calculate the free memory
- if None not in (self.memory_configured, self.memory_used):
- return self.memory_configured - self.memory_used
- else:
- # required field, so return zero
- return 0
- else:
- return self.memory_free
-
- def get_memory_used(self):
- if self.memory_used is None:
- # calculate the memory used
- if None not in (self.memory_configured, self.memory_free, self.memory_buffered,
- self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl):
- return self.memory_configured - (self.memory_free +
- self.memory_buffered + self.memory_cached +
- self.memory_slab_recl + self.memory_slab_unrecl)
- else:
- # required field, so return zero
- return 0
- else:
- return self.memory_used
-
- def get_memory_total(self):
- if self.memory_configured is None:
- # calculate the total memory
- if None not in (self.memory_used, self.memory_free, self.memory_buffered,
- self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl):
- return (self.memory_used + self.memory_free +
- self.memory_buffered + self.memory_cached +
- self.memory_slab_recl + self.memory_slab_unrecl)
- else:
- return None
- else:
- return self.memory_configured
-
- def get_obj(self):
- obj = {
- # required fields
- 'memoryFree': self.get_memory_free(),
- 'memoryUsed': self.get_memory_used(),
- 'vmIdentifier': self.vm_identifier
- }
- # optional fields
- self.set_optional(obj, 'memoryBuffered', self.memory_buffered)
- self.set_optional(obj, 'memoryCached', self.memory_cached)
- self.set_optional(obj, 'memoryConfigured', self.memory_configured)
- self.set_optional(obj, 'memorySlabRecl', self.memory_slab_recl)
- self.set_optional(obj, 'memorySlabUnrecl', self.memory_slab_unrecl)
- return obj
-
-
-class MeasurementsForVfScaling(Event):
- """MeasurementsForVfScaling datatype"""
-
- def __init__(self, event_id):
- """Construct the header"""
- super(MeasurementsForVfScaling, self).__init__()
- # common attributes
- self.domain = "measurementsForVfScaling"
- self.event_type = 'hostOS'
- self.event_id = event_id
- # measurement attributes
- self.additional_measurements = []
- self.codec_usage_array = []
- self.concurrent_sessions = 0
- self.configured_entities = 0
- self.cpu_usage_array = []
- self.feature_usage_array = []
- self.filesystem_usage_array = []
- self.latency_distribution = []
- self.mean_request_latency = 0
- self.measurement_interval = 0
- self.number_of_media_ports_in_use = 0
- self.request_rate = 0
- self.vnfc_scaling_metric = 0
- self.additional_fields = []
- self.additional_objects = []
- self.disk_usage_array = []
- self.measurements_for_vf_scaling_version = 2.0
- self.memory_usage_array = []
- self.v_nic_performance_array = []
-
- def add_additional_measurement(self, named_array):
- self.additional_measurements.append(named_array.get_obj())
-
- def add_additional_fields(self, field):
- self.additional_fields.append(field.get_obj())
-
- def add_memory_usage(self, mem_usage):
- self.memory_usage_array.append(mem_usage.get_obj())
-
- def add_cpu_usage(self, cpu_usage):
- self.cpu_usage_array.append(cpu_usage.get_obj())
-
- def add_v_nic_performance(self, nic_performance):
- self.v_nic_performance_array.append(nic_performance.get_obj())
-
- def add_disk_usage(self, disk_usage):
- self.disk_usage_array.append(disk_usage.get_obj())
-
- def get_obj(self):
- """Get the object of the datatype"""
- obj = {}
- obj['additionalMeasurements'] = self.additional_measurements
- obj['codecUsageArray'] = self.codec_usage_array
- obj['concurrentSessions'] = self.concurrent_sessions
- obj['configuredEntities'] = self.configured_entities
- obj['cpuUsageArray'] = self.cpu_usage_array
- obj['featureUsageArray'] = self.feature_usage_array
- obj['filesystemUsageArray'] = self.filesystem_usage_array
- obj['latencyDistribution'] = self.latency_distribution
- obj['meanRequestLatency'] = self.mean_request_latency
- obj['measurementInterval'] = self.measurement_interval
- obj['numberOfMediaPortsInUse'] = self.number_of_media_ports_in_use
- obj['requestRate'] = self.request_rate
- obj['vnfcScalingMetric'] = self.vnfc_scaling_metric
- obj['additionalFields'] = self.additional_fields
- obj['additionalObjects'] = self.additional_objects
- obj['diskUsageArray'] = self.disk_usage_array
- obj['measurementsForVfScalingVersion'] = self.measurements_for_vf_scaling_version
- obj['memoryUsageArray'] = self.memory_usage_array
- obj['vNicPerformanceArray'] = self.v_nic_performance_array
- return obj
-
- def get_name(self):
- """Name of datatype"""
- return "measurementsForVfScalingFields"
-
-
-class Fault(Event):
- """Fault datatype"""
-
- def __init__(self, event_id):
- """Construct the header"""
- super(Fault, self).__init__()
- # common attributes
- self.domain = "fault"
- self.event_id = event_id
- self.event_type = "Fault"
- # fault attributes
- self.fault_fields_version = 1.1
- self.event_severity = 'NORMAL'
- self.event_source_type = 'other(0)'
- self.alarm_condition = ''
- self.specific_problem = ''
- self.vf_status = 'Active'
- self.alarm_interface_a = ''
- self.alarm_additional_information = []
- self.event_category = ""
-
- def get_name(self):
- """Name of datatype"""
- return 'faultFields'
-
- def get_obj(self):
- """Get the object of the datatype"""
- obj = {}
- obj['faultFieldsVersion'] = self.fault_fields_version
- obj['eventSeverity'] = self.event_severity
- obj['eventSourceType'] = self.event_source_type
- obj['alarmCondition'] = self.alarm_condition
- obj['specificProblem'] = self.specific_problem
- obj['vfStatus'] = self.vf_status
- obj['alarmInterfaceA'] = self.alarm_interface_a
- obj['alarmAdditionalInformation'] = self.alarm_additional_information
- obj['eventCategory'] = self.event_category
- return obj
-
-
-class VESApp(object):
- """VES Application"""
-
- def __init__(self):
- """Application initialization"""
- self.__plugin_data_cache = {
- 'cpu': {'interval': 0.0, 'vls': []},
- 'virt': {'interval': 0.0, 'vls': []},
- 'disk': {'interval': 0.0, 'vls': []},
- 'interface': {'interval': 0.0, 'vls': []},
- 'memory': {'interval': 0.0, 'vls': []}
- }
- self.__app_config = {
- 'Domain': '127.0.0.1',
- 'Port': 30000,
- 'Path': '',
- 'Username': '',
- 'Password': '',
- 'Topic': '',
- 'UseHttps': False,
- 'SendEventInterval': 20.0,
- 'FunctionalRole': 'Collectd VES Agent',
- 'ApiVersion': 5.1,
- 'KafkaPort': 9092,
- 'KafkaBroker': 'localhost'
- }
- self.__host_name = None
- self.__ves_timer = None
- self.__lock = Lock()
- self.__event_id = 0
-
- def get_event_id(self):
- """get event id"""
- self.__event_id += 1
- return str(self.__event_id)
-
- def lock(self):
- """Lock the application"""
- self.__lock.acquire()
-
- def unlock(self):
- """Unlock the application"""
- self.__lock.release()
-
- def start_timer(self):
- """Start event timer"""
- self.__ves_timer = Timer(self.__app_config['SendEventInterval'], self.__on_time)
- self.__ves_timer.start()
-
- def stop_timer(self):
- """Stop event timer"""
- self.__ves_timer.cancel()
-
- def __on_time(self):
- """Timer thread"""
- self.event_timer()
- self.start_timer()
-
- def event_send(self, event):
- """Send event to VES"""
- server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
- 's' if self.__app_config['UseHttps'] else '', self.__app_config['Domain'],
- int(self.__app_config['Port']), '{}'.format(
- '/{}'.format(self.__app_config['Path']) if (len(self.__app_config['Path']) > 0) else ''),
- int(self.__app_config['ApiVersion']), '{}'.format(
- '/{}'.format(self.__app_config['Topic']) if (len(self.__app_config['Topic']) > 0) else ''))
- logging.info('Vendor Event Listener is at: {}'.format(server_url))
- credentials = base64.b64encode('{}:{}'.format(
- self.__app_config['Username'], self.__app_config['Password']).encode()).decode()
- logging.info('Authentication credentials are: {}'.format(credentials))
- try:
- request = url.Request(server_url)
- request.add_header('Authorization', 'Basic {}'.format(credentials))
- request.add_header('Content-Type', 'application/json')
- logging.debug("Sending {} to {}".format(event.get_json(), server_url))
- vel = url.urlopen(request, event.get_json(), timeout=1)
- logging.debug("Sent data to {} successfully".format(server_url))
- except url.HTTPError as e:
- logging.error('Vendor Event Listener exception: {}'.format(e))
- except url.URLError as e:
- logging.error('Vendor Event Listener is is not reachable: {}'.format(e))
- except Exception as e:
- logging.error('Vendor Event Listener error: {}'.format(e))
-
- def bytes_to_kb(self, bytes):
- """Convert bytes to kibibytes"""
- return round((bytes / 1024.0), 3)
-
- def get_hostname(self):
- return socket.gethostname()
-
- def send_host_measurements(self):
- # get list of all VMs
- virt_vcpu_total = self.cache_get_value(plugin_name='virt', type_name='virt_cpu_total',
- mark_as_read=False)
- vm_names = [x['plugin_instance'] for x in virt_vcpu_total]
- for vm_name in vm_names:
- # make sure that 'virt' plugin cache is up-to-date
- vm_values = self.cache_get_value(plugin_name='virt', plugin_instance=vm_name,
- mark_as_read=False)
- us_up_to_date = True
- for vm_value in vm_values:
- if vm_value['updated'] == False:
- logging.warning("%s" % vm_value)
- us_up_to_date = False
- break
- if not us_up_to_date:
- # one of the cache value is not up-to-date, break
- logging.warning("virt collectd cache values are not up-to-date for {}".format(vm_name))
- continue
- # values are up-to-date, create an event message
- measurement = MeasurementsForVfScaling(self.get_event_id())
- measurement.functional_role = self.__app_config['FunctionalRole']
- # fill out reporting_entity
- measurement.reporting_entity_id = self.get_hostname()
- measurement.reporting_entity_name = measurement.reporting_entity_id
- # set source as a host value
- measurement.source_id = vm_name
- measurement.source_name = measurement.source_id
- # fill out EpochMicrosec (convert to us)
- measurement.start_epoch_microsec = (virt_vcpu_total[0]['time'] * 1000000)
- # plugin interval
- measurement.measurement_interval = self.__plugin_data_cache['virt']['interval']
- # memoryUsage
- mem_usage = MemoryUsage(vm_name)
- memory_total = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='memory', type_instance='total')
- memory_unused = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='memory', type_instance='unused')
- memory_rss = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='memory', type_instance='rss')
- if len(memory_total) > 0:
- mem_usage.memory_configured = self.bytes_to_kb(memory_total[0]['values'][0])
- if len(memory_unused) > 0:
- mem_usage.memory_free = self.bytes_to_kb(memory_unused[0]['values'][0])
- elif len(memory_rss) > 0:
- mem_usage.memory_free = self.bytes_to_kb(memory_rss[0]['values'][0])
- # since, "used" metric is not provided by virt plugn, set the rest of the memory stats
- # to zero to calculate used based on provided stats only
- mem_usage.memory_buffered = mem_usage.memory_cached = mem_usage.memory_slab_recl = \
- mem_usage.memory_slab_unrecl = 0
- measurement.add_memory_usage(mem_usage)
- # cpuUsage
- virt_vcpus = self.cache_get_value(plugin_instance=vm_name,
- plugin_name='virt', type_name='virt_vcpu')
- for virt_vcpu in virt_vcpus:
- cpu_usage = CpuUsage(virt_vcpu['type_instance'])
- cpu_usage.percent_usage = self.cpu_ns_to_percentage(virt_vcpu)
- measurement.add_cpu_usage(cpu_usage)
- # vNicPerformance
- if_packets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='if_packets', mark_as_read=False)
- if_names = [x['type_instance'] for x in if_packets]
- for if_name in if_names:
- if_packets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='if_packets', type_instance=if_name)
- if_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='if_octets', type_instance=if_name)
- if_errors = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='if_errors', type_instance=if_name)
- if_dropped = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='if_dropped', type_instance=if_name)
- v_nic_performance = VNicPerformance(if_name)
- v_nic_performance.received_total_packets_accumulated = if_packets[0]['values'][0]
- v_nic_performance.transmitted_total_packets_accumulated = if_packets[0]['values'][1]
- v_nic_performance.received_octets_accumulated = if_octets[0]['values'][0]
- v_nic_performance.transmitted_octets_accumulated = if_octets[0]['values'][1]
- v_nic_performance.received_error_packets_accumulated = if_errors[0]['values'][0]
- v_nic_performance.transmitted_error_packets_accumulated = if_errors[0]['values'][1]
- v_nic_performance.received_discarded_packets_accumulated = if_dropped[0]['values'][0]
- v_nic_performance.transmitted_discarded_packets_accumulated = if_dropped[0]['values'][1]
- measurement.add_v_nic_performance(v_nic_performance)
- # diskUsage
- disk_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='disk_octets', mark_as_read=False)
- disk_names = [x['type_instance'] for x in disk_octets]
- for disk_name in disk_names:
- disk_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='disk_octets', type_instance=disk_name)
- disk_ops = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='disk_ops', type_instance=disk_name)
- disk_usage = DiskUsage(disk_name)
- disk_usage.disk_octets_read_last = disk_octets[0]['values'][0]
- disk_usage.disk_octets_write_last = disk_octets[0]['values'][1]
- disk_usage.disk_ops_read_last = disk_ops[0]['values'][0]
- disk_usage.disk_ops_write_last = disk_ops[0]['values'][1]
- measurement.add_disk_usage(disk_usage)
- # add additional measurements (perf)
- perf_values = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
- type_name='perf')
- named_array = NamedArrayOfFields('perf')
- for perf in perf_values:
- named_array.add(Field(perf['type_instance'], str(perf['values'][0])))
- measurement.add_additional_measurement(named_array)
- # add host values as additional measurements
- self.set_additional_fields(measurement, exclude_plugins=['virt'])
- # send event to the VES
- self.event_send(measurement)
- if len(vm_names) > 0:
- # mark the additional measurements metrics as read
- self.mark_cache_values_as_read(exclude_plugins=['virt'])
-
- def event_timer(self):
- """Event timer thread"""
- self.lock()
- try:
- self.send_host_measurements()
- finally:
- self.unlock()
-
- def mark_cache_values_as_read(self, exclude_plugins=None):
- """mark the cache values as read"""
- for plugin_name in self.__plugin_data_cache.keys():
- if (exclude_plugins != None and plugin_name in exclude_plugins):
- # skip excluded plugins
- continue;
- for val in self.__plugin_data_cache[plugin_name]['vls']:
- val['updated'] = False
-
- def set_additional_measurements(self, measurement, exclude_plugins=None):
- """Set addition measurement filed with host/guets values"""
- # add host/guest values as additional measurements
- for plugin_name in self.__plugin_data_cache.keys():
- if (exclude_plugins != None and plugin_name in exclude_plugins):
- # skip excluded plugins
- continue;
- for val in self.__plugin_data_cache[plugin_name]['vls']:
- if val['updated']:
- array_name = self.make_dash_string(plugin_name, val['plugin_instance'],
- val['type_instance'])
- named_array = NamedArrayOfFields(array_name)
-
- for index in range(len(val['dsnames'])):
- mname = '{}-{}'.format(val['type'], val['dsnames'][index])
- named_array.add(Field(mname, str(val['values'][index])))
- measurement.add_additional_measurement(named_array);
- val['updated'] = False
-
- def set_additional_fields(self, measurement, exclude_plugins=None):
- # set host values as additional fields
- for plugin_name in self.__plugin_data_cache.keys():
- if (exclude_plugins != None and plugin_name in exclude_plugins):
- # skip excluded plugins
- continue;
- for val in self.__plugin_data_cache[plugin_name]['vls']:
- if val['updated']:
- name_prefix = self.make_dash_string(plugin_name, val['plugin_instance'],
- val['type_instance'])
-
- for index in range(len(val['dsnames'])):
- field_name = self.make_dash_string(name_prefix, val['type'], val['dsnames'][index])
- measurement.add_additional_fields(Field(field_name, str(val['values'][index])))
-
- def cpu_ns_to_percentage(self, vl):
- """Convert CPU usage ns to CPU %"""
- total = vl['values'][0]
- total_time = vl['time']
- pre_total = vl['pre_values'][0]
- pre_total_time = vl['pre_time']
- if (total_time - pre_total_time) == 0:
- # return zero usage if time diff is zero
- return 0.0
- percent = (100.0 * (total - pre_total)) / ((total_time - pre_total_time) * 1000000000.0)
- logging.debug("pre_time={}, pre_value={}, time={}, value={}, cpu={}%".format(
- pre_total_time, pre_total, total_time, total, round(percent, 2)))
- return round(percent, 2)
-
- def make_dash_string(self, *args):
- """Join non empty strings with dash symbol"""
- return '-'.join(filter(lambda x: len(x) > 0, args))
-
- def config(self, config):
- """VES option configuration"""
- for key, value in config.items('config'):
- if key in self.__app_config:
- try:
- if type(self.__app_config[key]) == int:
- value = int(value)
- elif type(self.__app_config[key]) == float:
- value = float(value)
- elif type(self.__app_config[key]) == bool:
- value = bool(distutils.util.strtobool(value))
-
- if isinstance(value, type(self.__app_config[key])):
- self.__app_config[key] = value
- else:
- logging.error("Type mismatch with %s" % key)
- sys.exit()
- except ValueError:
- logging.error("Incorrect value type for %s" % key)
- sys.exit()
- else:
- logging.error("Incorrect key configuration %s" % key)
- sys.exit()
-
- def init(self):
- """Initialisation"""
- # start the VES timer
- self.start_timer()
-
- def update_cache_value(self, kafka_data):
- """Update value internal collectd cache values or create new one
- Please note, the cache should be locked before using this function"""
- found = False
- if kafka_data['plugin'] not in self.__plugin_data_cache:
- self.__plugin_data_cache[kafka_data['plugin']] = {'vls': []}
- plugin_vl = self.__plugin_data_cache[kafka_data['plugin']]['vls']
-
- for index in range(len(plugin_vl)):
- # record found, so just update time the values
- if (plugin_vl[index]['plugin_instance'] == kafka_data['plugin_instance']) and \
- (plugin_vl[index]['type_instance'] == kafka_data['type_instance']) and \
- (plugin_vl[index]['type'] == kafka_data['type']):
- plugin_vl[index]['pre_time'] = plugin_vl[index]['time']
- plugin_vl[index]['time'] = kafka_data['time']
- plugin_vl[index]['pre_values'] = plugin_vl[index]['values']
- plugin_vl[index]['values'] = kafka_data['values']
- plugin_vl[index]['dsnames'] = kafka_data['dsnames']
- plugin_vl[index]['updated'] = True
- found = True
- break
- if not found:
- value = {}
- # create new cache record
- value['plugin_instance'] = kafka_data['plugin_instance']
- value['type_instance'] = kafka_data['type_instance']
- value['values'] = kafka_data['values']
- value['pre_values'] = kafka_data['values']
- value['type'] = kafka_data['type']
- value['time'] = kafka_data['time']
- value['pre_time'] = kafka_data['time']
- value['host'] = kafka_data['host']
- value['dsnames'] = kafka_data['dsnames']
- value['updated'] = True
- self.__plugin_data_cache[kafka_data['plugin']]['vls'].append(value)
- # update plugin interval based on one received in the value
- self.__plugin_data_cache[kafka_data['plugin']]['interval'] = kafka_data['interval']
-
- def cache_get_value(self, plugin_name=None, plugin_instance=None,
- type_name=None, type_instance=None, type_names=None, mark_as_read=True):
- """Get cache value by given criteria"""
- ret_list = []
- if plugin_name in self.__plugin_data_cache:
- for val in self.__plugin_data_cache[plugin_name]['vls']:
- if (type_name == None or type_name == val['type']) and \
- (plugin_instance == None or plugin_instance == val['plugin_instance']) and \
- (type_instance == None or type_instance == val['type_instance']) and \
- (type_names == None or val['type'] in type_names):
- if mark_as_read:
- val['updated'] = False
- ret_list.append(val)
- return ret_list
-
- def shutdown(self):
- """Shutdown method for clean up"""
- self.stop_timer()
-
- def run(self):
- """Consumer JSON data from kafka broker"""
- kafka_server = '%s:%s' % (self.__app_config.get('KafkaBroker'), self.__app_config.get('KafkaPort'))
- consumer = KafkaConsumer('collectd', bootstrap_servers=kafka_server, auto_offset_reset='latest',
- enable_auto_commit=False,
- value_deserializer=lambda m: json.loads(m.decode('ascii')))
-
- for message in consumer:
- for kafka_data in message.value:
- self.lock()
- try:
- # Receive Kafka data and update cache values
- self.update_cache_value(kafka_data)
-
- finally:
- self.unlock()
-
-
-def main():
- # Parsing cmdline options
- parser = argparse.ArgumentParser()
- parser.add_argument("--config", dest="configfile", default=None, help="Specify config file", metavar="FILE")
- parser.add_argument("--loglevel", dest="level", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='WARNING',
- help="Specify log level (default: %(default)s)", metavar="LEVEL")
- parser.add_argument("--logfile", dest="logfile", default='ves_plugin.log',
- help="Specify log file (default: %(default)s)", metavar="FILE")
- args = parser.parse_args()
-
- # Create log file
- logging.basicConfig(filename=args.logfile, format='%(asctime)s %(message)s', level=args.level)
-
- # Create Application Instance
- application_instance = VESApp()
-
- # Read from config file
- if args.configfile:
- config = ConfigParser.ConfigParser()
- config.optionxform = lambda option: option
- config.read(args.configfile)
- # Write Config Values
- application_instance.config(config)
- else:
- logging.warning("No configfile specified, using default options")
-
- # Start timer for interval
- application_instance.init()
-
- # Read Data from Kakfa
- try:
- # Kafka consumer & update cache
- application_instance.run()
- except KeyboardInterrupt:
- logging.info(" - Ctrl-C handled, exiting gracefully")
- application_instance.shutdown()
- sys.exit()
- except:
- logging.error("Unknown Error")
-
-
-if __name__ == '__main__':
- main()
diff --git a/baro_tests/collectd.py b/baro_tests/collectd.py
index a002314e..304b87b8 100644
--- a/baro_tests/collectd.py
+++ b/baro_tests/collectd.py
@@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+# Patch on October 10 2017
"""Executing test of plugins"""
@@ -22,7 +23,6 @@ import time
import logging
import config_server
import tests
-import subprocess
from opnfv.deployment import factory
AODH_NAME = 'aodh'
@@ -30,7 +30,7 @@ GNOCCHI_NAME = 'gnocchi'
ID_RSA_SRC = '/root/.ssh/id_rsa'
ID_RSA_DST_DIR = '/root/.ssh'
ID_RSA_DST = ID_RSA_DST_DIR + '/id_rsa'
-APEX_IP = subprocess.check_output("echo $INSTALLER_IP", shell=True)
+APEX_IP = os.getenv("INSTALLER_IP").rstrip('\n')
APEX_USER = 'root'
APEX_USER_STACK = 'stack'
APEX_PKEY = '/root/.ssh/id_rsa'
@@ -173,31 +173,6 @@ class AodhClient(object):
logger.warning('Aodh is not registered in service catalog')
-class SNMPClient(object):
- """Client to request SNMP meters"""
- def __init__(self, conf, compute_node):
- """
- Keyword arguments:
- conf -- ConfigServer instance
- compute_node -- Compute node object
- """
- self.conf = conf
- self.compute_node = compute_node
-
- def get_snmp_metrics(self, compute_node, mib_file, mib_strings):
- snmp_output = {}
- if mib_file is None:
- cmd = "snmpwalk -v 2c -c public localhost IF-MIB::interfaces"
- ip = compute_node.get_ip()
- snmp_output = self.conf.execute_command(cmd, ip)
- else:
- for mib_string in mib_strings:
- snmp_output[mib_string] = self.conf.execute_command(
- "snmpwalk -v2c -m {} -c public localhost {}".format(
- mib_file, mib_string), compute_node.get_ip())
- return snmp_output
-
-
class CSVClient(object):
"""Client to request CSV meters"""
def __init__(self, conf):
@@ -224,31 +199,39 @@ class CSVClient(object):
if compute_name == node.get_dict()['name']:
date = node.run_cmd(
"date '+%Y-%m-%d'")
+ hostname = node.run_cmd('hostname -A')
+ hostname = hostname.split()[0]
metrics = []
for plugin_subdir in plugin_subdirectories:
for meter_category in meter_categories:
stdout1 = node.run_cmd(
"tail -2 /var/lib/collectd/csv/"
- + "{0}.jf.intel.com/{1}/{2}-{3}".format(
- compute_node.get_name(), plugin_subdir,
+ + "{0}/{1}/{2}-{3}".format(
+ hostname, plugin_subdir,
meter_category, date))
stdout2 = node.run_cmd(
"tail -1 /var/lib/collectd/csv/"
- + "{0}.jf.intel.com/{1}/{2}-{3}".format(
- compute_node.get_name(), plugin_subdir,
+ + "{0}/{1}/{2}-{3}".format(
+ hostname, plugin_subdir,
meter_category, date))
- # Storing last two values
+ # Storing last two values
values = stdout1
+ values2 = stdout2
if values is None:
logger.error(
'Getting last two CSV entries of meter category'
+ ' {0} in {1} subdir failed'.format(
meter_category, plugin_subdir))
+ elif values2 is None:
+ logger.error(
+ 'Getting last CSV entries of meter category'
+ + ' {0} in {1} subdir failed'.format(
+ meter_category, plugin_subdir))
else:
values = values.split(',')
old_value = float(values[0])
- stdout2 = stdout2.split(',')
- new_value = float(stdout2[0])
+ values2 = values2.split(',')
+ new_value = float(values2[0])
metrics.append((
plugin_subdir, meter_category, old_value,
new_value))
@@ -272,7 +255,7 @@ def get_csv_categories_for_ipmi(conf, compute_node):
return [category.strip()[:-11] for category in categories]
-def _process_result(compute_node, test, result, results_list):
+def _process_result(compute_node, out_plugin, test, result, results_list, node):
"""Print test result and append it to results list.
Keyword arguments:
@@ -282,13 +265,13 @@ def _process_result(compute_node, test, result, results_list):
"""
if result:
logger.info(
- 'Compute node {0} test case {1} PASSED.'.format(
- compute_node, test))
+ 'Test case for {0} with {1} PASSED on {2}.'.format(
+ node, out_plugin, test))
else:
logger.error(
- 'Compute node {0} test case {1} FAILED.'.format(
- compute_node, test))
- results_list.append((compute_node, test, result))
+ 'Test case for {0} with {1} FAILED on {2}.'.format(
+ node, out_plugin, test))
+ results_list.append((compute_node, out_plugin, test, result))
def _print_label(label):
@@ -333,22 +316,41 @@ def _print_final_result_of_plugin(
"""
print_line = ''
for id in compute_ids:
- if out_plugins[id] == out_plugin:
- if (id, plugin, True) in results:
+ if out_plugin == 'Gnocchi':
+ if (id, out_plugin, plugin, True) in results:
print_line += ' PASS |'
- elif (id, plugin, False) in results \
- and out_plugins[id] == out_plugin:
+ elif (id, out_plugin, plugin, False) in results:
+ print_line += ' FAIL |'
+ else:
+ print_line += ' NOT EX |'
+ elif out_plugin == 'AODH':
+ if (id, out_plugin, plugin, True) in results:
+ print_line += ' PASS |'
+ elif (id, out_plugin, plugin, False) in results:
+ print_line += ' FAIL |'
+ else:
+ print_line += ' FAIL |'
+ elif out_plugin == 'SNMP':
+ if (id, out_plugin, plugin, True) in results:
+ print_line += ' PASS |'
+ elif (id, out_plugin, plugin, False) in results:
+ print_line += ' FAIL |'
+ else:
+ print_line += ' FAIL |'
+ elif out_plugin == 'CSV':
+ if (id, out_plugin, plugin, True) in results:
+ print_line += ' PASS |'
+ elif (id, out_plugin, plugin, False) in results:
print_line += ' FAIL |'
else:
print_line += ' NOT EX |'
- elif out_plugin == 'Gnocchi':
- print_line += ' NOT EX |'
else:
- print_line += ' NOT EX |'
+ print_line += ' SKIP |'
return print_line
-def print_overall_summary(compute_ids, tested_plugins, results, out_plugins):
+def print_overall_summary(
+ compute_ids, tested_plugins, aodh_plugins, results, out_plugins):
"""Print overall summary table.
Keyword arguments:
@@ -359,7 +361,6 @@ def print_overall_summary(compute_ids, tested_plugins, results, out_plugins):
"""
compute_node_names = ['Node-{}'.format(i) for i in range(
len((compute_ids)))]
- # compute_node_names = ['Node-{}'.format(id) for id in compute_ids]
all_computes_in_line = ''
for compute in compute_node_names:
all_computes_in_line += '| ' + compute + (' ' * (7 - len(compute)))
@@ -377,46 +378,60 @@ def print_overall_summary(compute_ids, tested_plugins, results, out_plugins):
logger.info(line_of_nodes)
logger.info(
'+' + ('-' * 16) + '+' + (('-' * 8) + '+') * len(compute_node_names))
- out_plugins_print = ['Gnocchi']
- if 'SNMP' in out_plugins.values():
- out_plugins_print.append('SNMP')
- if 'AODH' in out_plugins.values():
- out_plugins_print.append('AODH')
- if 'CSV' in out_plugins.values():
- out_plugins_print.append('CSV')
+ out_plugins_print = []
+ out_plugins_print1 = []
+ for key in out_plugins.keys():
+ if 'Gnocchi' in out_plugins[key]:
+ out_plugins_print1.append('Gnocchi')
+ if 'AODH' in out_plugins[key]:
+ out_plugins_print1.append('AODH')
+ if 'SNMP' in out_plugins[key]:
+ out_plugins_print1.append('SNMP')
+ if 'CSV' in out_plugins[key]:
+ out_plugins_print1.append('CSV')
+ for i in out_plugins_print1:
+ if i not in out_plugins_print:
+ out_plugins_print.append(i)
for out_plugin in out_plugins_print:
output_plugins_line = ''
for id in compute_ids:
- out_plugin_result = 'FAIL'
+ out_plugin_result = '----'
if out_plugin == 'Gnocchi':
out_plugin_result = \
- 'PASS' if out_plugins[id] == out_plugin else 'FAIL'
- if out_plugin == 'AODH':
- if out_plugins[id] == out_plugin:
- out_plugin_result = \
- 'PASS' if out_plugins[id] == out_plugin else 'FAIL'
- if out_plugin == 'SNMP':
- if out_plugins[id] == out_plugin:
- out_plugin_result = \
- 'PASS' if out_plugins[id] == out_plugin else 'FAIL'
- if out_plugin == 'CSV':
- if out_plugins[id] == out_plugin:
- out_plugin_result = \
- 'PASS' if [
- plugin for comp_id, plugin, res in results
- if comp_id == id and res] else 'FAIL'
- else:
- out_plugin_result = 'SKIP'
+ 'PASS'
+ elif out_plugin == 'AODH':
+ out_plugin_result = \
+ 'PASS'
+ elif out_plugin == 'SNMP':
+ out_plugin_result = \
+ 'PASS'
+ elif out_plugin == 'CSV':
+ out_plugin_result = \
+ 'PASS' if [
+ plugin for comp_id, out_pl, plugin, res in results
+ if comp_id == id and res] else 'FAIL'
+ else:
+ out_plugin_result = \
+ 'FAIL'
output_plugins_line += '| ' + out_plugin_result + ' '
logger.info(
'| OUT:{}'.format(out_plugin) + (' ' * (11 - len(out_plugin)))
+ output_plugins_line + '|')
- for plugin in sorted(tested_plugins.values()):
- line_plugin = _print_final_result_of_plugin(
- plugin, compute_ids, results, out_plugins, out_plugin)
- logger.info(
- '| IN:{}'.format(plugin) + (' ' * (11-len(plugin)))
- + '|' + line_plugin)
+
+ if out_plugin == 'AODH':
+ for plugin in sorted(aodh_plugins.values()):
+ line_plugin = _print_final_result_of_plugin(
+ plugin, compute_ids, results, out_plugins, out_plugin)
+ logger.info(
+ '| IN:{}'.format(plugin) + (' ' * (11-len(plugin)))
+ + '|' + line_plugin)
+ else:
+ for plugin in sorted(tested_plugins.values()):
+ line_plugin = _print_final_result_of_plugin(
+ plugin, compute_ids, results, out_plugins, out_plugin)
+ logger.info(
+ '| IN:{}'.format(plugin) + (' ' * (11-len(plugin)))
+ + '|' + line_plugin)
logger.info(
'+' + ('-' * 16) + '+'
+ (('-' * 8) + '+') * len(compute_node_names))
@@ -424,8 +439,8 @@ def print_overall_summary(compute_ids, tested_plugins, results, out_plugins):
def _exec_testcase(
- test_labels, name, gnocchi_running, aodh_running, snmp_running,
- controllers, compute_node, conf, results, error_plugins, out_plugins):
+ test_labels, name, out_plugin, controllers, compute_node,
+ conf, results, error_plugins, out_plugins):
"""Execute the testcase.
Keyword arguments:
@@ -453,11 +468,8 @@ def _exec_testcase(
bridge for bridge in ovs_interfaces
if bridge in ovs_configured_bridges]
plugin_prerequisites = {
- 'intel_rdt': [(
- conf.is_libpqos_on_node(compute_node),
- 'libpqos must be installed.')],
'mcelog': [(
- conf.is_installed(compute_node, 'mcelog'),
+ conf.is_mcelog_installed(compute_node, 'mcelog'),
'mcelog must be installed.')],
'ovs_events': [(
len(ovs_existing_configured_int) > 0 or len(ovs_interfaces) > 0,
@@ -466,28 +478,22 @@ def _exec_testcase(
len(ovs_existing_configured_bridges) > 0,
'Bridges must be configured.')]}
gnocchi_criteria_lists = {
- 'hugepages': ['hugepages'],
- 'mcelog': ['mcelog'],
- 'ovs_events': ['interface-ovs-system'],
- 'ovs_stats': ['ovs_stats-br0.br0']}
+ 'hugepages': 'hugepages',
+ 'intel_rdt': 'rdt',
+ 'mcelog': 'mcelog',
+ 'ovs_events': 'interface-ovs-system',
+ 'ovs_stats': 'ovs_stats-br0.br0'}
aodh_criteria_lists = {
- 'mcelog': ['mcelog.errors'],
- 'ovs_events': ['ovs_events.gauge']}
+ 'mcelog': 'mcelog',
+ 'ovs_events': 'ovs_events'}
snmp_mib_files = {
'intel_rdt': '/usr/share/snmp/mibs/Intel-Rdt.txt',
'hugepages': '/usr/share/snmp/mibs/Intel-Hugepages.txt',
'mcelog': '/usr/share/snmp/mibs/Intel-Mcelog.txt'}
snmp_mib_strings = {
- 'intel_rdt': [
- 'INTEL-RDT-MIB::rdtLlc.1',
- 'INTEL-RDT-MIB::rdtIpc.1',
- 'INTEL-RDT-MIB::rdtMbmRemote.1',
- 'INTEL-RDT-MIB::rdtMbmLocal.1'],
- 'hugepages': [
- 'INTEL-HUGEPAGES-MIB::hugepagesPageFree'],
- 'mcelog': [
- 'INTEL-MCELOG-MIB::memoryCorrectedErrors.1',
- 'INTEL-MCELOG-MIB::memoryCorrectedErrors.2']}
+ 'intel_rdt': 'INTEL-RDT-MIB::rdtLlc.1',
+ 'hugepages': 'INTEL-HUGEPAGES-MIB::hugepagesPageFree',
+ 'mcelog': 'INTEL-MCELOG-MIB::memoryCorrectedErrors.1'}
nr_hugepages = int(time.time()) % 10000
snmp_in_commands = {
'intel_rdt': None,
@@ -496,13 +502,10 @@ def _exec_testcase(
'mcelog': '/root/mce-inject_df < /root/corrected'}
csv_subdirs = {
'intel_rdt': [
- 'intel_rdt-{}'.format(core)
- for core in conf.get_plugin_config_values(
- compute_node, 'intel_rdt', 'Cores')],
+ 'intel_rdt-0-2'],
'hugepages': [
'hugepages-mm-2048Kb', 'hugepages-node0-2048Kb',
- 'hugepages-node1-2048Kb', 'hugepages-mm-1048576Kb',
- 'hugepages-node0-1048576Kb', 'hugepages-node1-1048576Kb'],
+ 'hugepages-node1-2048Kb'],
# 'ipmi': ['ipmi'],
'mcelog': [
'mcelog-SOCKET_0_CHANNEL_0_DIMM_any',
@@ -515,19 +518,14 @@ def _exec_testcase(
# compute_node)
csv_meter_categories = {
'intel_rdt': [
- 'bytes-llc', 'ipc', 'memory_bandwidth-local',
- 'memory_bandwidth-remote'],
+ 'bytes-llc', 'ipc'],
'hugepages': ['vmpage_number-free', 'vmpage_number-used'],
# 'ipmi': csv_meter_categories_ipmi,
'mcelog': [
'errors-corrected_memory_errors',
- 'errors-uncorrected_memory_errors',
- 'errors-corrected_memory_errors_in_24h',
- 'errors-uncorrected_memory_errors_in_24h'],
+ 'errors-uncorrected_memory_errors'],
'ovs_stats': [
- 'if_collisions', 'if_dropped', 'if_errors', 'if_packets',
- 'if_rx_errors-crc', 'if_rx_errors-frame', 'if_rx_errors-over',
- 'if_rx_octets', 'if_tx_octets'],
+ 'if_dropped', 'if_errors', 'if_packets'],
'ovs_events': ['gauge-link_status']}
_print_plugin_label(
@@ -541,7 +539,8 @@ def _exec_testcase(
for error in plugin_critical_errors:
logger.error(' * ' + error)
_process_result(
- compute_node.get_id(), test_labels[name], False, results)
+ compute_node.get_id(), out_plugin, test_labels[name], False,
+ results, compute_node.get_name())
else:
plugin_errors = [
error for plugin, error, critical in error_plugins
@@ -563,35 +562,36 @@ def _exec_testcase(
for prerequisite in failed_prerequisites:
logger.error(' * {}'.format(prerequisite))
else:
- if gnocchi_running:
- plugin_interval = conf.get_plugin_interval(compute_node, name)
+ plugin_interval = conf.get_plugin_interval(compute_node, name)
+ if out_plugin == 'Gnocchi':
res = conf.test_plugins_with_gnocchi(
- compute_node.get_id(), plugin_interval, logger,
- criteria_list=gnocchi_criteria_lists[name])
- elif aodh_running:
+ compute_node.get_name(), plugin_interval,
+ logger, criteria_list=gnocchi_criteria_lists[name])
+ if out_plugin == 'AODH':
res = conf.test_plugins_with_aodh(
- compute_node.get_id(), plugin_interval,
- logger, creteria_list=aodh_criteria_lists[name])
- elif snmp_running:
+ compute_node.get_name(), plugin_interval,
+ logger, criteria_list=aodh_criteria_lists[name])
+ if out_plugin == 'SNMP':
res = \
name in snmp_mib_files and name in snmp_mib_strings \
- and tests.test_snmp_sends_data(
- compute_node,
- conf.get_plugin_interval(compute_node, name), logger,
- SNMPClient(conf, compute_node), snmp_mib_files[name],
- snmp_mib_strings[name], snmp_in_commands[name], conf)
- else:
+ and conf.test_plugins_with_snmp(
+ compute_node.get_name(), plugin_interval, logger, name,
+ snmp_mib_files[name], snmp_mib_strings[name],
+ snmp_in_commands[name])
+ if out_plugin == 'CSV':
res = tests.test_csv_handles_plugin_data(
compute_node, conf.get_plugin_interval(compute_node, name),
name, csv_subdirs[name], csv_meter_categories[name],
logger, CSVClient(conf))
+
if res and plugin_errors:
logger.info(
'Test works, but will be reported as failure,'
+ 'because of non-critical errors.')
res = False
_process_result(
- compute_node.get_id(), test_labels[name], res, results)
+ compute_node.get_id(), out_plugin, test_labels[name],
+ res, results, compute_node.get_name())
def get_results_for_ovs_events(
@@ -614,48 +614,48 @@ def create_ovs_bridge():
APEX_USER_STACK,
APEX_PKEY)
nodes = handler.get_nodes()
+ logger.info("Creating OVS bridges on computes nodes")
for node in nodes:
if node.is_compute():
node.run_cmd('sudo ovs-vsctl add-br br0')
node.run_cmd('sudo ovs-vsctl set-manager ptcp:6640')
- logger.info('OVS Bridges created on compute nodes')
+ logger.info('OVS Bridges created on compute nodes')
def mcelog_install():
"""Install mcelog on compute nodes."""
- _print_label('Enabling mcelog on compute nodes')
+ _print_label('Enabling mcelog and OVS bridges on compute nodes')
handler = factory.Factory.get_handler('apex',
APEX_IP,
APEX_USER_STACK,
APEX_PKEY)
nodes = handler.get_nodes()
+ mce_bin = os.path.dirname(os.path.realpath(__file__)) + '/mce-inject_ea'
for node in nodes:
if node.is_compute():
centos_release = node.run_cmd('uname -r')
if '3.10.0-514.26.2.el7.x86_64' not in centos_release:
logger.info(
'Mcelog will not be enabled '
- + 'on node-{0}, '.format(node.get_dict()['id'])
+ + 'on node-{0}, '.format(node.get_dict()['name'])
+ 'unsupported CentOS release found ({1}).'.format(
centos_release))
else:
logger.info(
'Checking if mcelog is enabled'
- + ' on node-{}...'.format(node.get_dict()['id']))
+ + ' on node-{}...'.format(node.get_dict()['name']))
res = node.run_cmd('ls')
if 'mce-inject_ea' and 'corrected' in res:
logger.info(
'Mcelog seems to be already installed '
- + 'on node-{}.'.format(node.get_dict()['id']))
+ + 'on node-{}.'.format(node.get_dict()['name']))
node.run_cmd('sudo modprobe mce-inject')
node.run_cmd('sudo ./mce-inject_ea < corrected')
else:
logger.info(
'Mcelog will be enabled on node-{}...'.format(
node.get_dict()['id']))
- node.put_file(
- '/usr/local/lib/python2.7/dist-packages/baro_tests/'
- + 'mce-inject_ea', 'mce-inject_ea')
+ node.put_file(mce_bin, 'mce-inject_ea')
node.run_cmd('chmod a+x mce-inject_ea')
node.run_cmd('echo "CPU 0 BANK 0" > corrected')
node.run_cmd(
@@ -734,43 +734,24 @@ def main(bt_logger=None):
_print_label(
'Display of Control and Compute nodes available in the set up')
- logger.info('controllers: {}'.format([('{0}: {1} ({2})'.format(
- node.get_id(), node.get_name(),
- node.get_ip())) for node in controllers]))
- logger.info('computes: {}'.format([('{0}: {1} ({2})'.format(
- node.get_id(), node.get_name(), node.get_ip()))
- for node in computes]))
+ logger.info('controllers: {}'.format([('{0}: {1}'.format(
+ node.get_name(), node.get_ip())) for node in controllers]))
+ logger.info('computes: {}'.format([('{0}: {1}'.format(
+ node.get_name(), node.get_ip())) for node in computes]))
mcelog_install()
create_ovs_bridge()
gnocchi_running_on_con = False
aodh_running_on_con = False
+ # Disabling SNMP write plug-in
snmp_running = False
- _print_label('Testing Gnocchi, AODH and SNMP on controller nodes')
+ _print_label('Testing Gnocchi and AODH plugins on nodes')
for controller in controllers:
- gnocchi_client = GnocchiClient()
- gnocchi_client.auth_token()
gnocchi_running = (
- gnocchi_running_on_con and conf.is_gnocchi_running(controller))
- aodh_client = AodhClient()
- aodh_client.auth_token()
+ gnocchi_running_on_con or conf.is_gnocchi_running(controller))
aodh_running = (
- aodh_running_on_con and conf.is_aodh_running(controller))
- if gnocchi_running:
- logger.info("Gnocchi is running on controller.")
- elif aodh_running:
- logger.error("Gnocchi is not running on controller.")
- logger.info("AODH is running on controller.")
- elif snmp_running:
- logger.error("Gnocchi is not running on Controller")
- logger.error("AODH is not running on controller.")
- logger.info("SNMP is running on controller.")
- else:
- logger.error("Gnocchi is not running on Controller")
- logger.error("AODH is not running on controller.")
- logger.error("SNMP is not running on controller.")
- logger.info("CSV will be enabled on compute nodes.")
+ aodh_running_on_con or conf.is_aodh_running(controller))
compute_ids = []
compute_node_names = []
@@ -782,118 +763,98 @@ def main(bt_logger=None):
'mcelog': 'Mcelog',
'ovs_stats': 'OVS stats',
'ovs_events': 'OVS events'}
- out_plugins = {
- 'gnocchi': 'Gnocchi',
- 'aodh': 'AODH',
- 'snmp': 'SNMP',
- 'csv': 'CSV'}
+ aodh_plugin_labels = {
+ 'mcelog': 'Mcelog',
+ 'ovs_events': 'OVS events'}
+ out_plugins = {}
for compute_node in computes:
node_id = compute_node.get_id()
node_name = compute_node.get_name()
- out_plugins[node_id] = 'CSV'
+ out_plugins[node_id] = []
compute_ids.append(node_id)
compute_node_names.append(node_name)
plugins_to_enable = []
- _print_label('NODE {}: Test Gnocchi Plug-in'.format(node_name))
- logger.info('Checking if gnocchi plug-in is included in compute nodes.')
- if not conf.check_gnocchi_plugin_included(compute_node):
- logger.error('Gnocchi plug-in is not included.')
- logger.info(
- 'Testcases on node {} will not be executed'.format(node_name))
- else:
- collectd_restarted, collectd_warnings = \
- conf.restart_collectd(compute_node)
- sleep_time = 30
- logger.info(
- 'Sleeping for {} seconds after collectd restart...'.format(
- sleep_time))
- time.sleep(sleep_time)
- if not collectd_restarted:
- for warning in collectd_warnings:
- logger.warning(warning)
+ error_plugins = []
+ gnocchi_running = (
+ gnocchi_running and conf.check_gnocchi_plugin_included(
+ compute_node))
+ aodh_running = (
+ aodh_running and conf.check_aodh_plugin_included(compute_node))
+ # logger.info("SNMP enabled on {}" .format(node_name))
+ if gnocchi_running:
+ out_plugins[node_id].append("Gnocchi")
+ if aodh_running:
+ out_plugins[node_id].append("AODH")
+ if snmp_running:
+ out_plugins[node_id].append("SNMP")
+
+ if 'Gnocchi' in out_plugins[node_id]:
+ plugins_to_enable.append('csv')
+ out_plugins[node_id].append("CSV")
+ if plugins_to_enable:
+ _print_label(
+ 'NODE {}: Enabling Test Plug-in '.format(node_name)
+ + 'and Test case execution')
+ if plugins_to_enable and not conf.enable_plugins(
+ compute_node, plugins_to_enable, error_plugins,
+ create_backup=False):
logger.error(
- 'Restart of collectd on node {} failed'.format(node_name))
+ 'Failed to test plugins on node {}.'.format(node_id))
logger.info(
'Testcases on node {} will not be executed'.format(
- node_name))
- else:
- for warning in collectd_warnings:
- logger.warning(warning)
-
- if gnocchi_running:
- out_plugins[node_id] = 'Gnocchi'
- logger.info("Gnocchi is active and collecting data")
- elif aodh_running:
- out_plugins[node_id] = 'AODH'
- logger.info("AODH withh be tested")
- _print_label('Node {}: Test AODH' .format(node_name))
- logger.info("Checking if AODH is running")
- logger.info("AODH is running")
- elif snmp_running:
- out_plugins[node_id] = 'SNMP'
- logger.info("SNMP will be tested.")
- _print_label('NODE {}: Test SNMP'.format(node_id))
- logger.info("Checking if SNMP is running.")
- logger.info("SNMP is running.")
- else:
- plugins_to_enable.append('csv')
- out_plugins[node_id] = 'CSV'
- logger.error("Gnocchi, AODH, SNMP are not running")
- logger.info(
- "CSV will be enabled for verification "
- + "of test plugins.")
- if plugins_to_enable:
- _print_label(
- 'NODE {}: Enabling Test Plug-in '.format(node_name)
- + 'and Test case execution')
- error_plugins = []
- if plugins_to_enable and not conf.enable_plugins(
- compute_node, plugins_to_enable, error_plugins,
- create_backup=False):
+ node_id))
+
+ for i in out_plugins[node_id]:
+ if i == 'AODH':
+ for plugin_name in sorted(aodh_plugin_labels.keys()):
+ _exec_testcase(
+ aodh_plugin_labels, plugin_name, i,
+ controllers, compute_node, conf, results,
+ error_plugins, out_plugins[node_id])
+ elif i == 'CSV':
+ _print_label("Node {}: Executing CSV Testcases".format(
+ node_name))
+ logger.info("Restarting collectd for CSV tests")
+ collectd_restarted, collectd_warnings = \
+ conf.restart_collectd(compute_node)
+ sleep_time = 10
+ logger.info(
+ 'Sleeping for {} seconds'.format(sleep_time)
+ + ' after collectd restart...')
+ time.sleep(sleep_time)
+ if not collectd_restarted:
+ for warning in collectd_warnings:
+ logger.warning(warning)
logger.error(
- 'Failed to test plugins on node {}.'.format(node_id))
+ 'Restart of collectd on node {} failed'.format(
+ compute_node))
logger.info(
- 'Testcases on node {} will not be executed'.format(
- node_id))
- else:
- if plugins_to_enable:
- collectd_restarted, collectd_warnings = \
- conf.restart_collectd(compute_node)
- sleep_time = 30
- logger.info(
- 'Sleeping for {} seconds'.format(sleep_time)
- + ' after collectd restart...')
- time.sleep(sleep_time)
- if plugins_to_enable and not collectd_restarted:
- for warning in collectd_warnings:
- logger.warning(warning)
- logger.error(
- 'Restart of collectd on node {} failed'.format(
- node_id))
- logger.info(
- 'Testcases on node {}'.format(node_id)
- + ' will not be executed.')
- else:
- if collectd_warnings:
- for warning in collectd_warnings:
- logger.warning(warning)
-
- for plugin_name in sorted(plugin_labels.keys()):
- _exec_testcase(
- plugin_labels, plugin_name, gnocchi_running,
- aodh_running, snmp_running, controllers,
- compute_node, conf, results, error_plugins,
- out_plugins[node_id])
-
- # _print_label('NODE {}: Restoring config file'.format(node_name))
- # conf.restore_config(compute_node)
- mcelog_delete()
- print_overall_summary(compute_ids, plugin_labels, results, out_plugins)
-
- if ((len([res for res in results if not res[2]]) > 0)
- or (len(results) < len(computes) * len(plugin_labels))):
- logger.error('Some tests have failed or have not been executed')
- return 1
+ 'CSV Testcases on node {}'.format(compute_node)
+ + ' will not be executed.')
+ for plugin_name in sorted(plugin_labels.keys()):
+ _exec_testcase(
+ plugin_labels, plugin_name, i,
+ controllers, compute_node, conf, results,
+ error_plugins, out_plugins[node_id])
+
+ else:
+ for plugin_name in sorted(plugin_labels.keys()):
+ _exec_testcase(
+ plugin_labels, plugin_name, i,
+ controllers, compute_node, conf, results,
+ error_plugins, out_plugins[node_id])
+
+ mcelog_delete()
+ print_overall_summary(
+ compute_ids, plugin_labels, aodh_plugin_labels, results, out_plugins)
+
+ for res in results:
+ if res[3] is 'False' or 'None':
+ logger.error('Some tests have failed or have not been executed')
+ logger.error('Overall Result is Fail')
+ return 1
+ else:
+ pass
return 0
diff --git a/baro_tests/config_server.py b/baro_tests/config_server.py
index f156fcf7..f35f7882 100644
--- a/baro_tests/config_server.py
+++ b/baro_tests/config_server.py
@@ -19,7 +19,6 @@ import time
import os.path
import os
import re
-import subprocess
from opnfv.deployment import factory
ID_RSA_PATH = '/root/.ssh/id_rsa'
SSH_KEYS_SCRIPT = '/home/opnfv/barometer/baro_utils/get_ssh_keys.sh'
@@ -28,7 +27,7 @@ COLLECTD_CONF = '/etc/collectd.conf'
COLLECTD_CONF_DIR = '/etc/collectd/collectd.conf.d'
NOTIFICATION_FILE = '/var/log/python-notifications.dump'
COLLECTD_NOTIFICATION = '/etc/collectd_notification_dump.py'
-APEX_IP = subprocess.check_output("echo $INSTALLER_IP", shell=True)
+APEX_IP = os.getenv("INSTALLER_IP").rstrip('\n')
APEX_USER = 'root'
APEX_USER_STACK = 'stack'
APEX_PKEY = '/root/.ssh/id_rsa'
@@ -101,20 +100,20 @@ class ConfigServer(object):
stderr_lines = stderr.readlines()
if stderr_lines:
self.__logger.warning(
- "'fuel node' command failed (try {}):".format(attempt))
+ "'Apex node' command failed (try {}):".format(attempt))
for line in stderr_lines:
self.__logger.debug(line.strip())
else:
fuel_node_passed = True
if attempt > 1:
self.__logger.info(
- "'fuel node' command passed (try {})".format(attempt))
+ "'Apex node' command passed (try {})".format(attempt))
attempt += 1
if not fuel_node_passed:
self.__logger.error(
- "'fuel node' command failed. This was the last try.")
+ "'Apex node' command failed. This was the last try.")
raise OSError(
- "'fuel node' command failed. This was the last try.")
+ "'Apex node' command failed. This was the last try.")
node_table = stdout.readlines()\
# skip table title and parse table values
@@ -184,9 +183,10 @@ class ConfigServer(object):
if compute_name == node.get_dict()['name']:
stdout = node.run_cmd(
'cat /etc/collectd/collectd.conf.d/{}.conf'.format(plugin))
+ if stdout is None:
+ return default_interval
for line in stdout.split('\n'):
if 'Interval' in line:
- # line = line.strip('Interval')
return 1
return default_interval
@@ -206,6 +206,8 @@ class ConfigServer(object):
if compute_name == node.get_dict()['name']:
stdout = node.run_cmd(
'cat /etc/collectd/collectd.conf.d/{}.conf' .format(plugin))
+ if stdout is None:
+ return default_values
for line in stdout.split('\n'):
if 'Interfaces' in line:
return line.split(' ', 1)[1]
@@ -257,28 +259,49 @@ class ConfigServer(object):
Return boolean value whether Gnocchi is running.
"""
gnocchi_present = False
- lines = self.execute_command(
- 'source overcloudrc.v3;systemctl status openstack-gnocchi-api | '
- + 'grep running', controller.get_ip())
- for line in lines:
- if '(running)' in line:
- gnocchi_present = True
+ controller_name = controller.get_name()
+ nodes = get_apex_nodes()
+ for node in nodes:
+ if controller_name == node.get_dict()['name']:
+ node.put_file(
+ '/home/opnfv/functest/conf/openstack.creds',
+ 'overcloudrc.v3')
+ stdout = node.run_cmd(
+ "source overcloudrc.v3;"
+ + "openstack catalog list | grep gnocchi")
+ if stdout is None:
+ return False
+ elif 'gnocchi' in stdout:
+ gnocchi_present = True
+ return gnocchi_present
+ else:
+ return False
return gnocchi_present
def is_aodh_running(self, controller):
"""Check whether aodh service is running on controller
"""
aodh_present = False
- lines = self.execute_command(
- 'source overcloudrc.v3;systemctl openstack-aodh-api | grep running',
- controller.get_ip())
- for line in lines:
- self.__logger.info("Line = {}" .format(line))
- if '(running)' in line:
- aodh_present = True
+ controller_name = controller.get_name()
+ nodes = get_apex_nodes()
+ for node in nodes:
+ if controller_name == node.get_dict()['name']:
+ node.put_file(
+ '/home/opnfv/functest/conf/openstack.creds',
+ 'overcloudrc.v3')
+ stdout = node.run_cmd(
+ "source overcloudrc.v3;"
+ + "openstack catalog list | grep aodh")
+ if stdout is None:
+ return False
+ elif 'aodh' in stdout:
+ aodh_present = True
+ return aodh_present
+ else:
+ return False
return aodh_present
- def is_installed(self, compute, package):
+ def is_mcelog_installed(self, compute, package):
"""Check whether package exists on compute node.
Keyword arguments:
@@ -292,8 +315,10 @@ class ConfigServer(object):
for node in nodes:
if compute_name == node.get_dict()['name']:
stdout = node.run_cmd(
- 'yum list installed | grep mcelog')
- if 'mcelog' in stdout:
+ 'rpm -qa | grep mcelog')
+ if stdout is None:
+ return 0
+ elif 'mcelog' in stdout:
return 1
else:
return 0
@@ -310,6 +335,32 @@ class ConfigServer(object):
return True
return False
+ def check_aodh_plugin_included(self, compute):
+ """Check if aodh plugin is included in collectd.conf file.
+ If not, try to enable it.
+
+ Keyword arguments:
+ compute -- compute node instance
+
+ Return boolean value whether AODH plugin is included
+ or it's enabling was successful.
+ """
+ compute_name = compute.get_name()
+ nodes = get_apex_nodes()
+ for node in nodes:
+ if compute_name == node.get_dict()['name']:
+ aodh_conf = node.run_cmd('ls /etc/collectd/collectd.conf.d')
+ if 'aodh.conf' not in aodh_conf:
+ self.__logger.info(
+ "AODH Plugin not included in {}".format(compute_name))
+ return False
+ else:
+ self.__logger.info(
+ "AODH plugin present in compute node {}" .format(
+ compute_name))
+ return True
+ return True
+
def check_gnocchi_plugin_included(self, compute):
"""Check if gnocchi plugin is included in collectd.conf file.
If not, try to enable it.
@@ -324,16 +375,37 @@ class ConfigServer(object):
nodes = get_apex_nodes()
for node in nodes:
if compute_name == node.get_dict()['name']:
- # node.run_cmd('su; "opnfvapex"')
gnocchi_conf = node.run_cmd('ls /etc/collectd/collectd.conf.d')
if 'collectd-ceilometer-plugin.conf' not in gnocchi_conf:
- self.__logger.info("Gnocchi Plugin not included")
- return True
+ self.__logger.info(
+ "Gnocchi Plugin not included in node {}".format(
+ compute_name))
+ return False
else:
- self.__logger.info("Gnochi plugin present")
+ self.__logger.info(
+ "Gnocchi plugin available in compute node {}" .format(
+ compute_name))
return True
return True
+ def check_snmp_plugin_included(self, compute):
+ """Check if SNMP plugin is active in compute node.
+ """
+ snmp_mib = '/usr/share/snmp/mibs/Intel-Rdt.txt'
+ snmp_string = 'INTEL-RDT-MIB::intelRdt'
+ compute_name = compute.get_name()
+ nodes = get_apex_nodes()
+ for node in nodes:
+ if compute_name == node.get_dict()['name']:
+ stdout = node.run_cmd(
+ 'snmpwalk -v2c -m {0} -c public localhost {1}' .format(
+ snmp_mib, snmp_string))
+ self.__logger.info("snmp output = {}" .format(stdout))
+ if 'OID' in stdout:
+ return False
+ else:
+ return True
+
def enable_plugins(
self, compute, plugins, error_plugins, create_backup=True):
"""Enable plugins on compute node
@@ -341,43 +413,21 @@ class ConfigServer(object):
Keyword arguments:
compute -- compute node instance
plugins -- list of plugins to be enabled
- error_plugins -- list of tuples with found errors, new entries
- may be added there (plugin, error_description, is_critical):
- plugin -- plug-in name
- error_decription -- description of the error
- is_critical -- boolean value indicating whether error
- is critical
- create_backup -- boolean value indicating whether backup
- shall be created
Return boolean value indicating whether function was successful.
"""
+ csv_file = os.path.dirname(os.path.realpath(__file__)) + '/csv.conf'
plugins = sorted(plugins)
compute_name = compute.get_name()
nodes = get_apex_nodes()
for node in nodes:
if compute_name == node.get_dict()['name']:
- node.put_file(
- '/usr/local/lib/python2.7/dist-packages/baro_tests/'
- + 'csv.conf', 'csv.conf')
+ node.put_file(csv_file, 'csv.conf')
node.run_cmd(
'sudo cp csv.conf '
+ '/etc/collectd/collectd.conf.d/csv.conf')
return True
- def restore_config(self, compute):
- """Restore collectd config file from backup on compute node.
-
- Keyword arguments:
- compute -- compute node instance
- """
- ssh, sftp = self.__open_sftp_session(
- compute.get_ip(), 'root', 'opnfvapex')
-
- self.__logger.info('Restoring config file from backup...')
- ssh.exec_command("cp {0} {0}.used".format(COLLECTD_CONF))
- ssh.exec_command("cp {0}.backup {0}".format(COLLECTD_CONF))
-
def restart_collectd(self, compute):
"""Restart collectd on compute node.
@@ -419,142 +469,175 @@ class ConfigServer(object):
return False, warning
return True, warning
- def test_gnocchi_is_sending_data(self, controller):
- """ Checking if Gnocchi is sending metrics to controller"""
- metric_ids = []
- timestamps1 = {}
- timestamps2 = {}
- ssh, sftp = self.__open_sftp_session(
- controller.get_ip(), 'root', 'opnfvapex')
-
- self.__logger.info('Getting gnocchi metric list on{}'.format(
- controller.get_name()))
- stdout = self.execute_command(
- "source overcloudrc.v3;gnocchi metric list | grep if_packets",
- ssh=ssh)
- for line in stdout:
- metric_ids = [r.split('|')[1] for r in stdout]
- self.__logger.info("Metric ids = {}" .format(metric_ids))
- for metric_id in metric_ids:
- metric_id = metric_id.replace("u", "")
- stdout = self.execute_command(
- "source overcloudrc.v3;gnocchi measures show {}" .format(
- metric_id), ssh=ssh)
- self.__logger.info("stdout measures ={}" .format(stdout))
- for line in stdout:
- if line[0] == '+':
- pass
- else:
- self.__logger.info("Line = {}" .format(line))
- timestamps1 = [line.split('|')[1]]
- self.__logger.info("Last line timetamp1 = {}" .format(timestamps1))
- time.sleep(10)
- stdout = self.execute_command(
- "source overcloudrc.v3;gnocchi measures show {}" .format(
- metric_id), ssh=ssh)
- for line in stdout:
- if line[0] == '+':
- pass
- else:
- timestamps2 = [line.split('|')[1]]
- self.__logger.info("Last line timetamp2 = {}" .format(timestamps2))
- if timestamps1 == timestamps2:
- self.__logger.info("False")
- # return False
- return True
- else:
- self.__logger.info("True")
- return True
+ def test_plugins_with_aodh(
+ self, compute, plugin_interval, logger,
+ criteria_list=[]):
- def test_plugins_with_aodh(self, controller):
- """Checking if AODH is sending metrics to controller"""
- metric_ids = []
+ metric_id = {}
timestamps1 = {}
timestamps2 = {}
- ssh, sftp = self.__open_sftp_session(
- controller.get_ip(), 'root', 'opnfvapex')
- self.__logger.info('Getting AODH alarm list on{}'.format(
- controller.get_name()))
- stdout = self.execute_command(
- "source overcloudrc.v3;aodh alarm list | grep mcelog",
- ssh=ssh)
- for line in stdout:
- metric_ids = [r.split('|')[1] for r in stdout]
- self.__logger.info("Metric ids = {}" .format(metric_ids))
- for metric_id in metric_ids:
- metric_id = metric_id.replace("u", "")
- stdout = self.execute_command(
- "source overcloudrc.v3;aodh alarm show {}" .format(
- metric_id), ssh=ssh)
- self.__logger.info("stdout alarms ={}" .format(stdout))
- for line in stdout:
- if line[0] == '+':
- pass
- else:
- self.__logger.info("Line = {}" .format(line))
- timestamps1 = [line.split('|')[1]]
- self.__logger.info("Last line timetamp1 = {}" .format(timestamps1))
- time.sleep(10)
- stdout = self.execute_command(
- "source overcloudrc.v3;aodh alarm show {}" .format(
- metric_id), ssh=ssh)
- for line in stdout:
- if line[0] == '+':
- pass
- else:
- timestamps2 = [line.split('|')[1]]
- self.__logger.info("Last line timetamp2 = {}" .format(timestamps2))
- if timestamps1 == timestamps2:
- self.__logger.info("False")
- # return False
- return True
- else:
- self.__logger.info("True")
- return True
+ nodes = get_apex_nodes()
+ for node in nodes:
+ if node.is_controller():
+ self.__logger.info('Getting AODH Alarm list on {}' .format(
+ (node.get_dict()['name'])))
+ node.put_file(
+ '/home/opnfv/functest/conf/openstack.creds',
+ 'overcloudrc.v3')
+ stdout = node.run_cmd(
+ "source overcloudrc.v3;"
+ + "aodh alarm list | grep {0} | grep {1}"
+ .format(criteria_list, compute))
+ if stdout is None:
+ self.__logger.info("aodh alarm list was empty")
+ return False
+ for line in stdout.splitlines():
+ line = line.replace('|', "")
+ metric_id = line.split()[0]
+ stdout = node.run_cmd(
+ 'source overcloudrc.v3; aodh alarm show {}' .format(
+ metric_id))
+ if stdout is None:
+ self.__logger.info("aodh alarm list was empty")
+ return False
+ for line in stdout.splitlines()[3: -1]:
+ line = line.replace('|', "")
+ if line.split()[0] == 'timestamp':
+ timestamps1 = line.split()[1]
+ else:
+ pass
+ time.sleep(12)
+ stdout = node.run_cmd(
+ "source overcloudrc.v3; aodh alarm show {}" .format(
+ metric_id))
+ if stdout is None:
+ self.__logger.info("aodh alarm list was empty")
+ return False
+ for line in stdout.splitlines()[3:-1]:
+ line = line.replace('|', "")
+ if line.split()[0] == 'timestamp':
+ timestamps2 = line.split()[1]
+ else:
+ pass
+ if timestamps1 == timestamps2:
+ self.__logger.info(
+ "Data not updated after interval of 12 seconds")
+ return False
+ else:
+ self.__logger.info("PASS")
+ return True
def test_plugins_with_gnocchi(
- self, controller, compute_node, plugin_interval, logger,
+ self, compute, plugin_interval, logger,
criteria_list=[]):
- metric_ids = []
+ metric_id = {}
timestamps1 = {}
timestamps2 = {}
- ssh, sftp = self.__open_sftp_session(
- controller.get_ip(), 'root', 'opnfvapex')
- self.__logger.info('Getting gnocchi metric list on{}'.format(
- controller.get_name()))
- stdout = self.execute_command(
- "source overcloudrc.v3;gnocchi metric list | grep {0} | grep {1}"
- .format(compute_node.get_name(), criteria_list), ssh=ssh)
- for line in stdout:
- metric_ids = [r.split('|')[1] for r in stdout]
- self.__logger.info("Metric ids = {}" .format(metric_ids))
- for metric_id in metric_ids:
- metric_id = metric_id.replace("u", "")
- stdout = self.execute_command(
- "source overcloudrc.v3;gnocchi measures show {}" .format(
- metric_id), ssh=ssh)
- self.__logger.info("stdout measures ={}" .format(stdout))
- for line in stdout:
- if line[0] == '+':
- pass
- else:
- self.__logger.info("Line = {}" .format(line))
- timestamps1 = [line.split('|')[1]]
- self.__logger.info("Last line timetamp1 = {}" .format(timestamps1))
- time.sleep(10)
- stdout = self.execute_command(
- "source overcloudrc.v3;gnocchi measures show {}" .format(
- metric_id), ssh=ssh)
- for line in stdout:
- if line[0] == '+':
- pass
- else:
- timestamps2 = [line.split('|')[1]]
- self.__logger.info("Last line timetamp2 = {}" .format(timestamps2))
- if timestamps1 == timestamps2:
- self.__logger.info("False")
- return False
- else:
- self.__logger.info("True")
- return True
+ nodes = get_apex_nodes()
+ sleep_time = plugin_interval + 2
+ for node in nodes:
+ if node.is_controller():
+ self.__logger.info('Getting gnocchi metric list on {}' .format(
+ (node.get_dict()['name'])))
+ node.put_file(
+ '/home/opnfv/functest/conf/openstack.creds',
+ 'overcloudrc.v3')
+ stdout = node.run_cmd(
+ "source overcloudrc.v3;"
+ + "gnocchi metric list | grep {0} | grep {1}"
+ .format(criteria_list, compute))
+ if stdout is None:
+ self.__logger.info("gnocchi list was empty")
+ return False
+ for line in stdout.splitlines():
+ line = line.replace('|', "")
+ metric_id = line.split()[0]
+ stdout = node.run_cmd(
+ 'source overcloudrc.v3;gnocchi measures show {}'.format(
+ metric_id))
+ if stdout is None:
+ self.__logger.info("gnocchi list was empty")
+ return False
+ for line in stdout.splitlines()[3: -1]:
+ if line[0] == '+':
+ pass
+ else:
+ timestamps1 = line.replace('|', "")
+ timestamps1 = timestamps1.split()[0]
+ time.sleep(sleep_time)
+ stdout = node.run_cmd(
+ "source overcloudrc.v3;gnocchi measures show {}".format(
+ metric_id))
+ if stdout is None:
+ self.__logger.info("gnocchi measures was empty")
+ return False
+ for line in stdout.splitlines()[3:-1]:
+ if line[0] == '+':
+ pass
+ else:
+ timestamps2 = line.replace('|', "")
+ timestamps2 = timestamps2.split()[0]
+ if timestamps1 == timestamps2:
+ self.__logger.info(
+ "Plugin Interval is {}" .format(plugin_interval))
+ self.__logger.info(
+ "Data not updated after {} seconds".format(
+ sleep_time))
+ return False
+ else:
+ self.__logger.info("PASS")
+ return True
+ return False
+
+ def test_plugins_with_snmp(
+ self, compute, plugin_interval, logger, plugin, snmp_mib_files=[],
+ snmp_mib_strings=[], snmp_in_commands=[]):
+
+ if plugin == 'hugepages' or 'intel_rdt' or 'mcelog':
+ nodes = get_apex_nodes()
+ for node in nodes:
+ if compute == node.get_dict()['name']:
+ stdout = node.run_cmd(
+ 'snmpwalk -v2c -m {0} -c public localhost {1}' .format(
+ snmp_mib_files, snmp_mib_strings))
+ self.__logger.info("{}" .format(stdout))
+ if stdout is None:
+ self.__logger.info("No output from snmpwalk")
+ return False
+ elif 'OID' in stdout:
+ self.__logger.info("SNMP query failed")
+ return False
+ else:
+ counter1 = stdout.split()[3]
+ time.sleep(10)
+ stdout = node.run_cmd(
+ 'snmpwalk -v2c -m {0} -c public localhost {1}' .format(
+ snmp_mib_files, snmp_mib_strings))
+ self.__logger.info("{}" .format(stdout))
+ if stdout is None:
+ self.__logger.info("No output from snmpwalk")
+ elif 'OID' in stdout:
+ self.__logger.info(
+ "SNMP query failed during second check")
+ self.__logger.info("waiting for 10 sec")
+ time.sleep(10)
+ stdout = node.run_cmd(
+ 'snmpwalk -v2c -m {0} -c public localhost {1}' .format(
+ snmp_mib_files, snmp_mib_strings))
+ self.__logger.info("{}" .format(stdout))
+ if stdout is None:
+ self.__logger.info("No output from snmpwalk")
+ elif 'OID' in stdout:
+ self.__logger.info("SNMP query failed again")
+ self.__logger.info("Failing this test case")
+ return False
+ else:
+ counter2 = stdout.split()[3]
+
+ if counter1 == counter2:
+ return False
+ else:
+ return True
+ else:
+ return False
diff --git a/docs/release/configguide/index.rst b/docs/release/configguide/index.rst
index 6305e357..57ff2787 100644
--- a/docs/release/configguide/index.rst
+++ b/docs/release/configguide/index.rst
@@ -1,3 +1,5 @@
+.. _barometer-configguide:
+
.. This work is licensed under a Creative Commons Attribution 4.0 International License.
.. http://creativecommons.org/licenses/by/4.0
.. Copyright (c) 2017 Open Platform for NFV Project, Inc. and its contributors
diff --git a/docs/release/release-notes/index.rst b/docs/release/release-notes/index.rst
index 44713cd3..db8221ab 100644
--- a/docs/release/release-notes/index.rst
+++ b/docs/release/release-notes/index.rst
@@ -1,3 +1,5 @@
+.. _barometer-releasenotes:
+
.. This work is licensed under a Creative Commons Attribution 4.0 International License.
.. http://creativecommons.org/licenses/by/4.0
.. (c) OPNFV, Intel Corporation and others.
diff --git a/docs/release/scenarios/os-nosdn-bar-ha/index.rst b/docs/release/scenarios/os-nosdn-bar-ha/index.rst
new file mode 100644
index 00000000..d1b18639
--- /dev/null
+++ b/docs/release/scenarios/os-nosdn-bar-ha/index.rst
@@ -0,0 +1,15 @@
+.. _os-nosdn-bar-ha:
+
+.. This work is licensed under a Creative Commons Attribution 4.0 International Licence.
+.. http://creativecommons.org/licenses/by/4.0
+.. (c) <optionally add copywriters name>
+
+========================================
+os-nosdn-bar-ha overview and description
+========================================
+
+.. toctree::
+ :numbered:
+ :maxdepth: 4
+
+ scenario.description.rst
diff --git a/docs/release/scenarios/os-nosdn-bar-noha/index.rst b/docs/release/scenarios/os-nosdn-bar-noha/index.rst
new file mode 100644
index 00000000..92851afa
--- /dev/null
+++ b/docs/release/scenarios/os-nosdn-bar-noha/index.rst
@@ -0,0 +1,15 @@
+.. _os-nosdn-bar-noha:
+
+.. This work is licensed under a Creative Commons Attribution 4.0 International Licence.
+.. http://creativecommons.org/licenses/by/4.0
+.. (c) <optionally add copywriters name>
+
+==========================================
+os-nosdn-bar-noha overview and description
+==========================================
+
+.. toctree::
+ :numbered:
+ :maxdepth: 4
+
+ scenario.description.rst
diff --git a/docs/release/userguide/feature.userguide.rst b/docs/release/userguide/feature.userguide.rst
index 099d8e27..d61af7cd 100644
--- a/docs/release/userguide/feature.userguide.rst
+++ b/docs/release/userguide/feature.userguide.rst
@@ -321,7 +321,7 @@ To configure some hugepages:
$ sudo mkdir -p /mnt/huge
$ sudo mount -t hugetlbfs nodev /mnt/huge
- $ sudo echo 14336 > /sys/devices/system/node/node0/hugepages/hugepages-2048kB/nr_hugepages
+ $ sudo bash -c "echo 14336 > /sys/devices/system/node/node0/hugepages/hugepages-2048kB/nr_hugepages"
Building and installing collectd:
@@ -514,41 +514,24 @@ Remove old version of OpenIPMI library:
$ sudo yum remove OpenIPMI ipmitool
-Download OpenIPMI library sources:
+Build and install OpenIPMI library:
.. code:: bash
$ git clone https://git.code.sf.net/p/openipmi/code openipmi-code
$ cd openipmi-code
-
-Patch the OpenIPMI pkg-config file to provide correct compilation flags
-for collectd IPMI plugin:
-
-.. code:: diff
-
- diff --git a/OpenIPMIpthread.pc.in b/OpenIPMIpthread.pc.in
- index 59b52e5..fffa0d0 100644
- --- a/OpenIPMIpthread.pc.in
- +++ b/OpenIPMIpthread.pc.in
- @@ -6,6 +6,6 @@ includedir=@includedir@
- Name: OpenIPMIpthread
- Description: Pthread OS handler for OpenIPMI
- Version: @VERSION@
- -Requires: OpenIPMI pthread
- +Requires: OpenIPMI
- Libs: -L${libdir} -lOpenIPMIutils -lOpenIPMIpthread
- -Cflags: -I${includedir}
- +Cflags: -I${includedir} -pthread
-
-Build and install OpenIPMI library:
-
-.. code:: bash
-
$ autoreconf --install
$ ./configure --prefix=/usr
$ make
$ sudo make install
+Add the directory containing ``OpenIPMI*.pc`` files to the ``PKG_CONFIG_PATH``
+environment variable:
+
+.. code:: bash
+
+ export PKG_CONFIG_PATH=/usr/lib/pkgconfig
+
Enable IPMI support in the kernel:
.. code:: bash
@@ -924,9 +907,11 @@ On Centos 7:
.. code:: bash
$ sudo yum install net-snmp net-snmp-libs net-snmp-utils net-snmp-devel
- $ systemctl start snmpd.service
+ $ sudo systemctl start snmpd.service
-Or build from source
+go to the `snmp configuration`_ steps.
+
+From source:
Clone and build net-snmp:
@@ -967,12 +952,14 @@ Configure snmpd as a service:
$ systemctl enable snmpd.service
$ systemctl start snmpd.service
+.. _`snmp configuration`:
+
Add the following line to snmpd.conf configuration file
-"/usr/share/snmp/snmpd.conf" to make all OID tree visible for SNMP clients:
+``/etc/snmp/snmpd.conf`` to make all OID tree visible for SNMP clients:
.. code:: bash
- view systemonly included .1
+ view systemview included .1
To verify that SNMP is working you can get IF-MIB table using SNMP client
to view the list of Linux interfaces:
@@ -981,11 +968,28 @@ to view the list of Linux interfaces:
$ snmpwalk -v 2c -c public localhost IF-MIB::interfaces
+Get the default MIB location:
+
+.. code:: bash
+
+ $ net-snmp-config --default-mibdirs
+ /opt/stack/.snmp/mibs:/usr/share/snmp/mibs
+
+Install Intel specific MIBs (if needed) into location received by
+``net-snmp-config`` command (e.g. ``/usr/share/snmp/mibs``).
+
+.. code:: bash
+
+ $ git clone https://gerrit.opnfv.org/gerrit/barometer.git
+ $ sudo cp -f barometer/mibs/*.txt /usr/share/snmp/mibs/
+ $ sudo systemctl restart snmpd.service
+
Clone and install the collectd snmp_agent plugin:
.. code:: bash
- $ git clone https://github.com/maryamtahhan/collectd
+ $ cd ~
+ $ git clone https://github.com/maryamtahhan/collectd
$ cd collectd
$ git checkout feat_snmp
$ ./build.sh
@@ -1013,6 +1017,15 @@ memAvailReal OID to value represented as free memory type of memory plugin:
</Data>
</Plugin>
+
+The ``snmpwalk`` command can be used to validate the collectd configuration:
+
+.. code:: bash
+
+ $ snmpwalk -v 2c -c public localhost 1.3.6.1.4.1.2021.4.6.0
+ UCD-SNMP-MIB::memAvailReal.0 = INTEGER: 135237632 kB
+
+
**Limitations**
* Object instance with Counter64 type is not supported in SNMPv1. When GetNext
diff --git a/docs/release/userguide/index.rst b/docs/release/userguide/index.rst
index 287b16be..a6ec261f 100644
--- a/docs/release/userguide/index.rst
+++ b/docs/release/userguide/index.rst
@@ -1,3 +1,5 @@
+.. _barometer-userguide:
+
.. This work is licensed under a Creative Commons Attribution 4.0 International License.
.. http://creativecommons.org/licenses/by/4.0
.. (c) Intel and OPNFV
diff --git a/src/Makefile b/src/Makefile
index 382acefc..e7a472c1 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -38,6 +38,6 @@ SUBDIRS += dpdk
SUBDIRS += libpqos
SUBDIRS += pmu-tools
SUBDIRS += collectd
-SUBDIRS += collectd-ceilometer-plugin
+SUBDIRS += collectd-openstack-plugins
include mk/make-subsys.mk
diff --git a/src/collectd-ceilometer-plugin/Makefile b/src/collectd-openstack-plugins/Makefile
index a8b8f938..96bbebb8 100644
--- a/src/collectd-ceilometer-plugin/Makefile
+++ b/src/collectd-openstack-plugins/Makefile
@@ -25,7 +25,7 @@ include ../package-list.mk
.PHONY: force_make install
-WORK_DIR = collectd-ceilometer-plugin
+WORK_DIR = collectd-openstack-plugins
TAG_DONE_FLAG = $(WORK_DIR)/.$(COLLECTD_OPENSTACK_TAG).tag.done
all: force_make install
@@ -59,5 +59,5 @@ sanity:
@echo "Make sanity in $(WORK_DIR) (stub) "
$(WORK_DIR):
- $(AT)git clone $(COLLECTD_OPENSTACK_URL)
+ $(AT)git clone $(COLLECTD_OPENSTACK_URL) $(WORK_DIR)
diff --git a/src/package-list.mk b/src/package-list.mk
index 762b4230..e857dcf8 100644
--- a/src/package-list.mk
+++ b/src/package-list.mk
@@ -19,4 +19,4 @@ COLLECTD_URL ?= https://github.com/collectd/collectd
COLLECTD_TAG ?= master
COLLECTD_OPENSTACK_URL ?= https://github.com/openstack/collectd-ceilometer-plugin
-COLLECTD_OPENSTACK_TAG ?= stable/ocata
+COLLECTD_OPENSTACK_TAG ?= stable/pike