diff options
20 files changed, 1614 insertions, 1449 deletions
diff --git a/3rd_party/collectd-ves-plugin/LICENSE b/3rd_party/collectd-ves-app/LICENSE index 8b515df5..804ec198 100644 --- a/3rd_party/collectd-ves-plugin/LICENSE +++ b/3rd_party/collectd-ves-app/LICENSE @@ -10,10 +10,14 @@ so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/3rd_party/collectd-ves-plugin/PSF_LICENSE_AGREEMENT b/3rd_party/collectd-ves-app/PSF_LICENSE_AGREEMENT index 666648bb..666648bb 100644 --- a/3rd_party/collectd-ves-plugin/PSF_LICENSE_AGREEMENT +++ b/3rd_party/collectd-ves-app/PSF_LICENSE_AGREEMENT diff --git a/3rd_party/collectd-ves-app/ves_app/__init__.py b/3rd_party/collectd-ves-app/ves_app/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/3rd_party/collectd-ves-app/ves_app/__init__.py diff --git a/3rd_party/collectd-ves-app/ves_app/host.yaml b/3rd_party/collectd-ves-app/ves_app/host.yaml new file mode 100644 index 00000000..a91574cf --- /dev/null +++ b/3rd_party/collectd-ves-app/ves_app/host.yaml @@ -0,0 +1,214 @@ +--- +# Common event header definition (required fields and defaults) +commonEventHeader: &commonEventHeader + domain: N/A + eventId: "{system.id}" + eventName: "" + eventType: Info + lastEpochMicrosec: 0 + priority: Normal + reportingEntityId: &reportingEntityId "{system.hostname}" + reportingEntityName: *reportingEntityId + sequence: 0 + sourceName: N/A + startEpochMicrosec: 0 + version: 2.0 + +# Value mapping (used to map collectd notification severity to VES) +collectdSeverityMapping: &collectdSeverityMapping + NOTIF_FAILURE: CRITICAL + NOTIF_WARNING: WARNING + NOTIF_OKAY: NORMAL + +# Measurements definition +Host Measurements: !Measurements + - ITEM-DESC: + event: + commonEventHeader: + <<: *commonEventHeader + eventType: hostOS + domain: measurementsForVfScaling + sourceId: &sourceId "{vl.plugin_instance}" + sourceName: *sourceId + startEpochMicrosec: !Number "{vl.time}" + measurementsForVfScalingFields: + measurementsForVfScalingVersion: 2.0 + additionalFields: !ArrayItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: "/^(?!memory|virt_vcpu|disk_octets|disk_ops|if_packets|if_errors|if_octets|if_dropped).*$/" + - ITEM-DESC: + name: "{vl.type}-{vl.type_instance}-{vl.ds_name}" + value: "{vl.value}" + additionalMeasurements: !ArrayItem + - SELECT: + plugin: "/^(?!virt).*$/" + - INDEX-KEY: + - plugin + - plugin_instance + - ITEM-DESC: + name: "{vl.plugin}-{vl.plugin_instance}" + arrayOfFields: !ArrayItem + - SELECT: + plugin: "{vl.plugin}" + plugin_instance: "{vl.plugin_instance}" + - ITEM-DESC: + name: "{vl.type}-{vl.type_instance}-{vl.ds_name}" + value: "{vl.value}" + measurementInterval: !Number "{vl.interval}" + memoryUsageArray: !ArrayItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: memory + type_instance: total + - ITEM-DESC: + memoryConfigured: !Bytes2Kibibytes "{vl.value}" + vmIdentifier: "{vl.plugin_instance}" + memoryUsed: 0.0 + memoryFree: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: memory + type_instance: rss + - VALUE: !Bytes2Kibibytes "{vl.value}" + - DEFAULT: 0 + cpuUsageArray: !ArrayItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: virt_vcpu + - ITEM-DESC: + cpuIdentifier: "{vl.type_instance}" + percentUsage: !Number "{vl.value}" + vNicPerformanceArray: !ArrayItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: if_packets + ds_name: rx + - ITEM-DESC: + valuesAreSuspect: "true" + vNicIdentifier: "{vl.type_instance}" + receivedTotalPacketsAccumulated: !Number "{vl.value}" + transmittedTotalPacketsAccumulated: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: if_packets + type_instance: "{vl.type_instance}" + ds_name: tx + receivedOctetsAccumulated: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: if_octets + type_instance: "{vl.type_instance}" + ds_name: rx + transmittedOctetsAccumulated: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: if_octets + type_instance: "{vl.type_instance}" + ds_name: tx + receivedErrorPacketsAccumulated: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: if_errors + type_instance: "{vl.type_instance}" + ds_name: rx + transmittedErrorPacketsAccumulated: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: if_errors + type_instance: "{vl.type_instance}" + ds_name: tx + receivedDiscardedPacketsAccumulated: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: if_dropped + type_instance: "{vl.type_instance}" + ds_name: rx + transmittedDiscardedPacketsAccumulated: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: if_dropped + type_instance: "{vl.type_instance}" + ds_name: tx + diskUsageArray: !ArrayItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: disk_octets + ds_name: read + - ITEM-DESC: + diskIdentifier: "{vl.type_instance}" + diskOctetsReadLast: !Number "{vl.value}" + diskOctetsWriteLast: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: disk_octets + type_instance: "{vl.type_instance}" + ds_name: write + diskOpsReadLast: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: disk_ops + type_instance: "{vl.type_instance}" + ds_name: read + diskOpsWriteLast: !ValueItem + - SELECT: + plugin: virt + plugin_instance: "{vl.plugin_instance}" + type: disk_ops + type_instance: "{vl.type_instance}" + ds_name: write + - SELECT: + plugin: virt + type_instance: virt_cpu_total + +Virt Events: !Events + - ITEM-DESC: + event: + commonEventHeader: &event_commonEventHeader + <<: *commonEventHeader + domain: fault + eventType: Notification + sourceId: &event_sourceId "{n.plugin_instance}" + sourceName: *event_sourceId + lastEpochMicrosec: !Number "{n.time}" + startEpochMicrosec: !Number "{n.time}" + faultFields: &faultFields + alarmInterfaceA: "{n.plugin}-{n.plugin_instance}" + alarmCondition: "{n.message}" + eventSeverity: !MapValue + VALUE: "{n.severity}" + TO: *collectdSeverityMapping + eventSourceType: hypervisor + faultFieldsVersion: 1.1 + specificProblem: "{n.plugin_instance}-{n.type_instance}" + vfStatus: Active + - CONDITION: + plugin: virt + +Host Events: !Events + - ITEM-DESC: + event: + commonEventHeader: + <<: *event_commonEventHeader + sourceId: "{system.hostname}" + sourceName: "{system.hostname}" + faultFields: + <<: *faultFields + eventSourceType: host + - CONDITION: + plugin: "/^(?!virt).*$/" diff --git a/3rd_party/collectd-ves-app/ves_app/normalizer.py b/3rd_party/collectd-ves-app/ves_app/normalizer.py new file mode 100644 index 00000000..899c850b --- /dev/null +++ b/3rd_party/collectd-ves-app/ves_app/normalizer.py @@ -0,0 +1,598 @@ +# +# Copyright(c) 2017 Intel Corporation. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Authors: +# Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com> +# + +import yaml +import logging +import datetime +import time +from threading import RLock +from threading import Timer +from threading import Thread +import re + +# import YAML loader +try: + from yaml import CLoader as Loader +except ImportError: + from yaml import Loader + +# import synchronized queue +try: + # python 2.x + import Queue as queue +except ImportError: + # python 3.x + import queue + + +class Config(object): + """Configuration class used to pass config option into YAML file""" + + def __init__(self, interval): + self.interval = interval + + +class System(object): + """System class which provides information like host, time etc., into YAML + file""" + + def __init__(self): + self.hostname = 'localhost' + self._id = 0 + + @property + def id(self): + self._id = self._id + 1 + return self._id + + @property + def time(self): + return time.time() + + @property + def date(self): + return datetime.date.today().isoformat() + + +class ItemIterator(object): + """Item iterator returned by Collector class""" + + def __init__(self, collector, items): + """Item iterator init""" + logging.debug('{}:__init__()'.format(self.__class__.__name__)) + self._items = items + self._collector = collector + self._index = 0 + + def next(self): + """Returns next item from the list""" + if self._index == len(self._items): + raise StopIteration + curr_index = self._index + self._index = curr_index + 1 + return self.items[curr_index] + + def __getitem__(self, key): + """get item by index""" + return self._items[key] + + def __len__(self): + """Return length of elements""" + return len(self._items) + + def __del__(self): + """Destroy iterator and unlock the collector""" + logging.debug('{}:__del__()'.format(self.__class__.__name__)) + self._collector.unlock() + + +class ItemObject(object): + """Item object returned by Collector class""" + + def __init__(self, collector, hash_): + """Item object init""" + logging.debug('{}:__init__()'.format(self.__class__.__name__)) + super(ItemObject, self).__setattr__('_collector', collector) + super(ItemObject, self).__setattr__('_hash', hash_) + + def __setattr__(self, name, value): + t, item = self._collector._metrics[self._hash] + logging.debug('{}:__setattr__(name={}, value={})'.format( + self.__class__.__name__, name, value)) + setattr(item, name, value) + self._collector._metrics[self._hash] = (time.time(), item) + + def __del__(self): + """Destroy item object and unlock the collector""" + logging.debug('{}:__del__()'.format(self.__class__.__name__)) + self._collector.unlock() + + +class Collector(object): + """Thread-safe collector with aging feature""" + + def __init__(self, age_timeout): + """Initialization""" + self._metrics = {} + self._lock = RLock() + self._age_timeout = age_timeout + self._start_age_timer() + + def _start_age_timer(self): + """Start age timer""" + self._age_timer = Timer(self._age_timeout, self._on_timer) + self._age_timer.start() + + def _stop_age_timer(self): + """Stop age timer""" + self._age_timer.cancel() + + def _on_timer(self): + """Age timer""" + self._start_age_timer() + self._check_aging() + + def _check_aging(self): + """Check aging time for all items""" + self.lock() + for data_hash, data in self._metrics.items(): + age, item = data + if ((time.time() - age) >= self._age_timeout): + # aging time has expired, remove the item from the collector + logging.debug('{}:_check_aging():value={}'.format( + self.__class__.__name__, item)) + self._metrics.pop(data_hash) + del(item) + self.unlock() + + def lock(self): + """Lock the collector""" + logging.debug('{}:lock()'.format(self.__class__.__name__)) + self._lock.acquire() + + def unlock(self): + """Unlock the collector""" + logging.debug('{}:unlock()'.format(self.__class__.__name__)) + self._lock.release() + + def get(self, hash_): + self.lock() + if hash_ in self._metrics: + return ItemObject(self, hash_) + self.unlock() + return None + + def add(self, item): + """Add an item into the collector""" + self.lock() + logging.debug('{}:add(item={})'.format(self.__class__.__name__, item)) + self._metrics[hash(item)] = (time.time(), item) + self.unlock() + + def items(self, select_list=[]): + """Returns locked (safe) item iterator""" + metrics = [] + self.lock() + for k, item in self._metrics.items(): + _, value = item + for select in select_list: + if value.match(**select): + metrics.append(value) + return ItemIterator(self, metrics) + + def destroy(self): + """Destroy the collector""" + self._stop_age_timer() + + +class CollectdData(object): + """Base class for Collectd data""" + + def __init__(self, host=None, plugin=None, plugin_instance=None, + type_=None, type_instance=None, time_=None): + """Class initialization""" + self.host = host + self.plugin = plugin + self.plugin_instance = plugin_instance + self.type_instance = type_instance + self.type = type_ + self.time = time_ + + @classmethod + def is_regular_expression(cls, expr): + return True if expr[0] == '/' and expr[-1] == '/' else False + + def match(self, **kargs): + # compare the metric + for key, value in kargs.items(): + if self.is_regular_expression(value): + if re.match(value[1:-1], getattr(self, key)) is None: + return False + elif value != getattr(self, key): + return False + # return match event if kargs is empty + return True + + +class CollectdNotification(CollectdData): + """Collectd notification""" + + def __init__(self, host=None, plugin=None, plugin_instance=None, + type_=None, type_instance=None, severity=None, message=None): + super(CollectdNotification, self).__init__( + host, plugin, plugin_instance, type_, type_instance) + self.severity = severity + self.message = message + + def __repr__(self): + return '{}(host={}, plugin={}, plugin_instance={}, type={},' \ + 'type_instance={}, severity={}, message={}, time={})'.format( + self.__class__.__name__, self.host, self.plugin, + self.plugin_instance, self.type, self.type_instance, + self.severity, self.message, time) + + +class CollectdValue(CollectdData): + """Collectd value""" + + def __init__(self, host=None, plugin=None, plugin_instance=None, + type_=None, type_instance=None, ds_name='value', value=None, + interval=None): + super(CollectdValue, self).__init__( + host, plugin, plugin_instance, type_, type_instance) + self.value = value + self.ds_name = ds_name + self.interval = interval + + @classmethod + def hash_gen(cls, host, plugin, plugin_instance, type_, + type_instance, ds_name): + return hash((host, plugin, plugin_instance, type_, + type_instance, ds_name)) + + def __eq__(self, other): + return hash(self) == hash(other) and self.value == other.value + + def __hash__(self): + return self.hash_gen(self.host, self.plugin, self.plugin_instance, + self.type, self.type_instance, self.ds_name) + + def __repr__(self): + return '{}(host={}, plugin={}, plugin_instance={}, type={},' \ + 'type_instance={}, ds_name={}, value={}, time={})'.format( + self.__class__.__name__, self.host, self.plugin, + self.plugin_instance, self.type, self.type_instance, + self.ds_name, self.value, self.time) + + +class Item(yaml.YAMLObject): + """Base class to process tags like ArrayItem/ValueItem""" + + @classmethod + def format_node(cls, mapping, metric): + if mapping.tag in [ + 'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag, + Number.yaml_tag]: + return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric)) + elif mapping.tag == 'tag:yaml.org,2002:map': + values = [] + for key, value in mapping.value: + values.append((yaml.ScalarNode(key.tag, key.value), + cls.format_node(value, metric))) + return yaml.MappingNode(mapping.tag, values) + elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]: + values = [] + for seq in mapping.value: + map_values = list() + for key, value in seq.value: + if key.value == 'SELECT': + map_values.append((yaml.ScalarNode(key.tag, key.value), + cls.format_node(value, metric))) + else: + map_values.append((yaml.ScalarNode(key.tag, key.value), + value)) + values.append(yaml.MappingNode(seq.tag, map_values)) + return yaml.SequenceNode(mapping.tag, values) + elif mapping.tag in [MapValue.yaml_tag]: + values = [] + for key, value in mapping.value: + if key.value == 'VALUE': + values.append((yaml.ScalarNode(key.tag, key.value), + cls.format_node(value, metric))) + else: + values.append((yaml.ScalarNode(key.tag, key.value), value)) + return yaml.MappingNode(mapping.tag, values) + return mapping + + +class ValueItem(Item): + """Class to process VlaueItem tag""" + yaml_tag = u'!ValueItem' + + @classmethod + def from_yaml(cls, loader, node): + logging.debug('{}:from_yaml(loader={})'.format(cls.__name__, loader)) + default, select, value_desc = None, list(), None + # find value description + for elem in node.value: + for key, value in elem.value: + if key.value == 'VALUE': + assert value_desc is None, "VALUE key already set" + value_desc = value + if key.value == 'SELECT': + select.append(loader.construct_mapping(value)) + if key.value == 'DEFAULT': + assert default is None, "DEFAULT key already set" + default = loader.construct_object(value) + # if VALUE key isn't given, use default VALUE key + # format: `VALUE: !Number '{vl.value}'` + if value_desc is None: + value_desc = yaml.ScalarNode(tag=u'!Number', value=u'{vl.value}') + # select collectd metric based on SELECT condition + metrics = loader.collector.items(select) + assert len(metrics) < 2, \ + 'Wrong SELECT condition, selected {} metrics'.format(len(metrics)) + if len(metrics) > 0: + item = cls.format_node(value_desc, {'vl': metrics[0], + 'system': loader.system}) + return loader.construct_object(item) + # nothing has been found by SELECT condition, set to DEFAULT value. + assert default is not None, \ + "No metrics selected by SELECT condition and DEFAULT key isn't set" + return default + + +class ArrayItem(Item): + """Class to process ArrayItem tag""" + yaml_tag = u'!ArrayItem' + + @classmethod + def from_yaml(cls, loader, node): + logging.debug('{}:process(loader={}, node={})'.format(cls.__name__, + loader, node)) + # e.g.: + # SequenceNode(tag=u'!ArrayItem', value=[ + # MappingNode(tag=u'tag:yaml.org,2002:map', value=[ + # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'), + # MappingNode(tag=u'tag:yaml.org,2002:map', value=[ + # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'), + # , ...) + # ]), ... + # ), (key, value), ... ]) + # , ... ]) + assert isinstance(node, yaml.SequenceNode), \ + "{} tag isn't YAML array".format(cls.__name__) + select, index_keys, items, item_desc = list(), list(), list(), None + for elem in node.value: + for key, value in elem.value: + if key.value == 'ITEM-DESC': + assert item_desc is None, "ITEM-DESC key already set" + item_desc = value + if key.value == 'INDEX-KEY': + assert len(index_keys) == 0, "INDEX-KEY key already set" + index_keys = loader.construct_sequence(value) + if key.value == 'SELECT': + select.append(loader.construct_mapping(value)) + # validate item description + assert item_desc is not None, "Mandatory ITEM-DESC key isn't set" + assert len(select) > 0 or len(index_keys) > 0, \ + "Mandatory key (INDEX-KEY or SELECT) isn't set" + metrics = loader.collector.items(select) + # select metrics based on INDEX-KEY provided + if len(index_keys) > 0: + metric_set = set() + for metric in metrics: + value_params = {} + for key in index_keys: + value_params[key] = getattr(metric, key) + metric_set.add(CollectdValue(**value_params)) + metrics = list(metric_set) + # build items based on SELECT and/or INDEX-KEY criteria + for metric in metrics: + item = cls.format_node(item_desc, + {'vl': metric, 'system': loader.system, + 'config': loader.config}) + items.append(loader.construct_mapping(item)) + return items + + +class Measurements(ArrayItem): + """Class to process Measurements tag""" + yaml_tag = u'!Measurements' + + +class Events(Item): + """Class to process Events tag""" + yaml_tag = u'!Events' + + @classmethod + def from_yaml(cls, loader, node): + condition, item_desc = dict(), None + for elem in node.value: + for key, value in elem.value: + if key.value == 'ITEM-DESC': + item_desc = value + if key.value == 'CONDITION': + condition = loader.construct_mapping(value) + assert item_desc is not None, "Mandatory ITEM-DESC key isn't set" + if loader.notification.match(**condition): + item = cls.format_node(item_desc, { + 'n': loader.notification, 'system': loader.system}) + return loader.construct_mapping(item) + return None + + +class Bytes2Kibibytes(yaml.YAMLObject): + """Class to process Bytes2Kibibytes tag""" + yaml_tag = u'!Bytes2Kibibytes' + + @classmethod + def from_yaml(cls, loader, node): + return round(float(node.value) / 1024.0, 3) + + +class Number(yaml.YAMLObject): + """Class to process Number tag""" + yaml_tag = u'!Number' + + @classmethod + def from_yaml(cls, loader, node): + try: + return int(node.value) + except ValueError: + return float(node.value) + + +class MapValue(yaml.YAMLObject): + """Class to process MapValue tag""" + yaml_tag = u'!MapValue' + + @classmethod + def from_yaml(cls, loader, node): + mapping, val = None, None + for key, value in node.value: + if key.value == 'TO': + mapping = loader.construct_mapping(value) + if key.value == 'VALUE': + val = loader.construct_object(value) + assert mapping is not None, "Mandatory TO key isn't set" + assert val is not None, "Mandatory VALUE key isn't set" + assert val in mapping, \ + 'Value "{}" cannot be mapped to any of {} values'.format( + val, mapping.keys()) + return mapping[val] + + +class Normalizer(object): + """Normalization class which handles events and measurements""" + + def __init__(self): + """Init""" + self.interval = None + self.collector = None + self.system = None + self.queue = None + self.timer = None + + @classmethod + def read_configuration(cls, config_file): + """read YAML configuration file""" + # load YAML events/measurements definition + f = open(config_file, 'r') + doc_yaml = yaml.compose(f) + f.close() + # split events & measurements definitions + measurements, events = list(), list() + for key, value in doc_yaml.value: + if value.tag == Measurements.yaml_tag: + measurements.append((key, value)) + if value.tag == Events.yaml_tag: + events.append((key, value)) + measurements_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map', + measurements) + measurements_stream = yaml.serialize(measurements_yaml) + events_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map', events) + events_stream = yaml.serialize(events_yaml) + # return event & measurements definition + return events_stream, measurements_stream + + def initialize(self, config_file, interval): + """Initialize the class""" + e, m = self.read_configuration(config_file) + self.measurements_stream = m + self.events_stream = e + self.system = System() + self.config = Config(interval) + self.interval = interval + # start collector with aging time = double interval + self.collector = Collector(interval * 2) + # initialize event thread + self.queue = queue.Queue() + self.event_thread = Thread(target=self.event_worker) + self.event_thread.daemon = True + self.event_thread.start() + # initialize measurements timer + self.start_timer() + + def destroy(self): + """Destroy the class""" + self.collector.destroy() + self.post_event(None) # send stop event + self.event_thread.join() + self.stop_timer() + + def start_timer(self): + """Start measurements timer""" + self.timer = Timer(self.interval, self.on_timer) + self.timer.start() + + def stop_timer(self): + """Stop measurements timer""" + self.timer.cancel() + + def on_timer(self): + """Measurements timer""" + self.start_timer() + self.process_measurements() + + def event_worker(self): + """Event worker""" + while True: + event = self.queue.get() + if isinstance(event, CollectdNotification): + self.process_notify(event) + continue + # exit for the worker + break + + def get_collector(self): + """Get metric collector reference""" + return self.collector + + def process_measurements(self): + """Process measurements""" + loader = Loader(self.measurements_stream) + setattr(loader, 'collector', self.collector) + setattr(loader, 'system', self.system) + setattr(loader, 'config', self.config) + measurements = loader.get_data() + for measurement_name in measurements: + logging.debug('Process "{}" measurements: {}'.format( + measurement_name, measurements[measurement_name])) + for measurement in measurements[measurement_name]: + self.send_data(measurement) + + def process_notify(self, notification): + """Process events""" + loader = Loader(self.events_stream) + setattr(loader, 'notification', notification) + setattr(loader, 'system', self.system) + notifications = loader.get_data() + for notify_name in notifications: + logging.debug('Process "{}" notification'.format(notify_name)) + if notifications[notify_name] is not None: + self.send_data(notifications[notify_name]) + + def send_data(self, data): + """Send data""" + assert False, 'send_data() is abstract function and MUST be overridden' + + def post_event(self, notification): + """Post notification into the queue to process""" + self.queue.put(notification) diff --git a/3rd_party/collectd-ves-app/ves_app/ves_app.py b/3rd_party/collectd-ves-app/ves_app/ves_app.py new file mode 100644 index 00000000..105c66e2 --- /dev/null +++ b/3rd_party/collectd-ves-app/ves_app/ves_app.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import sys +import base64 +import ConfigParser +import logging +import argparse + +from distutils.util import strtobool +from kafka import KafkaConsumer + +from normalizer import Normalizer +from normalizer import CollectdValue + +try: + # For Python 3.0 and later + import urllib.request as url +except ImportError: + # Fall back to Python 2's urllib2 + import urllib2 as url + + +class VESApp(Normalizer): + """VES Application""" + + def __init__(self): + """Application initialization""" + self._app_config = { + 'Domain': '127.0.0.1', + 'Port': 30000, + 'Path': '', + 'Username': '', + 'Password': '', + 'Topic': '', + 'UseHttps': False, + 'SendEventInterval': 20.0, + 'ApiVersion': 5.1, + 'KafkaPort': 9092, + 'KafkaBroker': 'localhost' + } + + def send_data(self, event): + """Send event to VES""" + server_url = "http{}://{}:{}{}/eventListener/v{}{}".format( + 's' if self._app_config['UseHttps'] else '', + self._app_config['Domain'], int(self._app_config['Port']), + '{}'.format('/{}'.format(self._app_config['Path']) if len( + self._app_config['Path']) > 0 else ''), + int(self._app_config['ApiVersion']), '{}'.format( + '/{}'.format(self._app_config['Topic']) if len( + self._app_config['Topic']) > 0 else '')) + logging.info('Vendor Event Listener is at: {}'.format(server_url)) + credentials = base64.b64encode('{}:{}'.format( + self._app_config['Username'], + self._app_config['Password']).encode()).decode() + logging.info('Authentication credentials are: {}'.format(credentials)) + try: + request = url.Request(server_url) + request.add_header('Authorization', 'Basic {}'.format(credentials)) + request.add_header('Content-Type', 'application/json') + event_str = json.dumps(event).encode() + logging.debug("Sending {} to {}".format(event_str, server_url)) + url.urlopen(request, event_str, timeout=1) + logging.debug("Sent data to {} successfully".format(server_url)) + except url.HTTPError as e: + logging.error('Vendor Event Listener exception: {}'.format(e)) + except url.URLError as e: + logging.error( + 'Vendor Event Listener is is not reachable: {}'.format(e)) + except Exception as e: + logging.error('Vendor Event Listener error: {}'.format(e)) + + def config(self, config): + """VES option configuration""" + for key, value in config.items('config'): + if key in self._app_config: + try: + if type(self._app_config[key]) == int: + value = int(value) + elif type(self._app_config[key]) == float: + value = float(value) + elif type(self._app_config[key]) == bool: + value = bool(strtobool(value)) + + if isinstance(value, type(self._app_config[key])): + self._app_config[key] = value + else: + logging.error("Type mismatch with %s" % key) + sys.exit() + except ValueError: + logging.error("Incorrect value type for %s" % key) + sys.exit() + else: + logging.error("Incorrect key configuration %s" % key) + sys.exit() + + def init(self, configfile, schema_file): + if configfile is not None: + # read VES configuration file if provided + config = ConfigParser.ConfigParser() + config.optionxform = lambda option: option + config.read(configfile) + self.config(config) + # initialize normalizer + self.initialize(schema_file, self._app_config['SendEventInterval']) + + def run(self): + """Consumer JSON data from kafka broker""" + kafka_server = '{}:{}'.format( + self._app_config.get('KafkaBroker'), + self._app_config.get('KafkaPort')) + consumer = KafkaConsumer( + 'collectd', bootstrap_servers=kafka_server, + auto_offset_reset='latest', enable_auto_commit=False, + value_deserializer=lambda m: json.loads(m.decode('ascii'))) + + for message in consumer: + for kafka_data in message.value: + # { + # u'dstypes': [u'derive'], + # u'plugin': u'cpu', + # u'dsnames': [u'value'], + # u'interval': 10.0, + # u'host': u'localhost', + # u'values': [99.9978996416267], + # u'time': 1502114956.244, + # u'plugin_instance': u'44', + # u'type_instance': u'idle', + # u'type': u'cpu' + # } + logging.debug('{}:run():data={}'.format( + self.__class__.__name__, kafka_data)) + for ds_name in kafka_data['dsnames']: + index = kafka_data['dsnames'].index(ds_name) + val_hash = CollectdValue.hash_gen( + kafka_data['host'], kafka_data['plugin'], + kafka_data['plugin_instance'], kafka_data['type'], + kafka_data['type_instance'], ds_name) + collector = self.get_collector() + val = collector.get(val_hash) + if val: + # update the value + val.value = kafka_data['values'][index] + val.time = kafka_data['time'] + del(val) + else: + # add new value into the collector + val = CollectdValue() + val.host = kafka_data['host'] + val.plugin = kafka_data['plugin'] + val.plugin_instance = kafka_data['plugin_instance'] + val.type = kafka_data['type'] + val.type_instance = kafka_data['type_instance'] + val.value = kafka_data['values'][index] + val.interval = kafka_data['interval'] + val.time = kafka_data['time'] + val.ds_name = ds_name + collector.add(val) + + +def main(): + # Parsing cmdline options + parser = argparse.ArgumentParser() + parser.add_argument("--events-schema", dest="schema", required=True, + help="YAML events schema definition", metavar="FILE") + parser.add_argument("--config", dest="configfile", default=None, + help="Specify config file", metavar="FILE") + parser.add_argument("--loglevel", dest="level", default='INFO', + choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], + help="Specify log level (default: %(default)s)", + metavar="LEVEL") + parser.add_argument("--logfile", dest="logfile", default='ves_app.log', + help="Specify log file (default: %(default)s)", + metavar="FILE") + args = parser.parse_args() + + # Create log file + logging.basicConfig(filename=args.logfile, + format='%(asctime)s %(message)s', + level=args.level) + if args.configfile is None: + logging.warning("No configfile specified, using default options") + + # Create Application Instance + application_instance = VESApp() + application_instance.init(args.configfile, args.schema) + + try: + # Run the plugin + application_instance.run() + except KeyboardInterrupt: + logging.info(" - Ctrl-C handled, exiting gracefully") + except Exception as e: + logging.error('{}, {}'.format(type(e), e)) + finally: + application_instance.destroy() + sys.exit() + + +if __name__ == '__main__': + main() diff --git a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin_config.conf b/3rd_party/collectd-ves-app/ves_app/ves_app_config.conf index 1dccd49b..aee0816c 100644 --- a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin_config.conf +++ b/3rd_party/collectd-ves-app/ves_app/ves_app_config.conf @@ -6,8 +6,7 @@ Topic = example_vnf UseHttps = false Username = Password = -FunctionalRole = Collectd VES Agent SendEventInterval = 20 ApiVersion = 3 KafkaPort = 9092 -KafkaBroker = localhost
\ No newline at end of file +KafkaBroker = localhost diff --git a/3rd_party/collectd-ves-plugin/ves_plugin/__init__.py b/3rd_party/collectd-ves-plugin/ves_plugin/__init__.py deleted file mode 100644 index 14364a35..00000000 --- a/3rd_party/collectd-ves-plugin/ves_plugin/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py b/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py deleted file mode 100644 index 4c313cee..00000000 --- a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py +++ /dev/null @@ -1,945 +0,0 @@ -#!/usr/bin/env python -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -import sys -import base64 -import ConfigParser -import logging -import argparse - -from distutils.util import strtobool -from kafka import KafkaConsumer - -try: - # For Python 3.0 and later - import urllib.request as url -except ImportError: - # Fall back to Python 2's urllib2 - import urllib2 as url -import socket -import time -from threading import Timer -from threading import Lock - - -class Event(object): - """Event header""" - - def __init__(self): - """Construct the common header""" - self.version = 2.0 - self.event_type = "Info" # use "Info" unless a notification is generated - self.domain = "" - self.event_id = "" - self.source_id = "" - self.source_name = "" - self.functional_role = "" - self.reporting_entity_id = "" - self.reporting_entity_name = "" - self.priority = "Normal" # will be derived from event if there is one - self.start_epoch_microsec = 0 - self.last_epoch_micro_sec = 0 - self.sequence = 0 - self.event_name = "" - self.internal_header_fields = {} - self.nfc_naming_code = "" - self.nf_naming_code = "" - - def get_json(self): - """Get the object of the datatype""" - obj = {} - obj['version'] = self.version - obj['eventType'] = self.event_type - obj['domain'] = self.domain - obj['eventId'] = self.event_id - obj['sourceId'] = self.source_id - obj['sourceName'] = self.source_name - obj['reportingEntityId'] = self.reporting_entity_id - obj['reportingEntityName'] = self.reporting_entity_name - obj['priority'] = self.priority - obj['startEpochMicrosec'] = self.start_epoch_microsec - obj['lastEpochMicrosec'] = self.last_epoch_micro_sec - obj['sequence'] = self.sequence - obj['eventName'] = self.event_name - obj['internalHeaderFields'] = self.internal_header_fields - obj['nfcNamingCode'] = self.nfc_naming_code - obj['nfNamingCode'] = self.nf_naming_code - return json.dumps({ - 'event': { - 'commonEventHeader': obj, - self.get_name(): self.get_obj() - } - }).encode() - - def get_name(): - assert False, 'abstract method get_name() is not implemented' - - def get_obj(): - assert False, 'abstract method get_obj() is not implemented' - - -class Field(object): - """field datatype""" - - def __init__(self, name, value): - self.name = name - self.value = value - - def get_obj(self): - return { - 'name': self.name, - 'value': self.value - } - - -class NamedArrayOfFields(object): - """namedArrayOfFields datatype""" - - def __init__(self, name): - self.name = name - self.array_of_fields = [] - - def add(self, field): - self.array_of_fields.append(field.get_obj()) - - def get_obj(self): - return { - 'name': self.name, - 'arrayOfFields': self.array_of_fields - } - - -class VESDataType(object): - """ Base VES datatype """ - - def set_optional(self, obj, key, val): - if val is not None: - obj[key] = val - - -class DiskUsage(VESDataType): - """diskUsage datatype""" - - def __init__(self, identifier): - self.disk_identifier = identifier - self.disk_io_time_avg = None - self.disk_io_time_last = None - self.disk_io_time_max = None - self.disk_io_time_min = None - self.disk_merged_read_avg = None - self.disk_merged_read_last = None - self.disk_merged_read_max = None - self.disk_merged_read_min = None - self.disk_merged_write_avg = None - self.disk_merged_write_last = None - self.disk_merged_write_max = None - self.disk_merged_write_min = None - self.disk_octets_read_avg = None - self.disk_octets_read_last = None - self.disk_octets_read_max = None - self.disk_octets_read_min = None - self.disk_octets_write_avg = None - self.disk_octets_write_last = None - self.disk_octets_write_max = None - self.disk_octets_write_min = None - self.disk_ops_read_avg = None - self.disk_ops_read_last = None - self.disk_ops_read_max = None - self.disk_ops_read_min = None - self.disk_ops_write_avg = None - self.disk_ops_write_last = None - self.disk_ops_write_max = None - self.disk_ops_write_min = None - self.disk_pending_operations_avg = None - self.disk_pending_operations_last = None - self.disk_pending_operations_max = None - self.disk_pending_operations_min = None - self.disk_time_read_avg = None - self.disk_time_read_last = None - self.disk_time_read_max = None - self.disk_time_read_min = None - self.disk_time_write_avg = None - self.disk_time_write_last = None - self.disk_time_write_max = None - self.disk_time_write_min = None - - def get_obj(self): - obj = { - # required - 'diskIdentifier': self.disk_identifier - } - self.set_optional(obj, 'diskIoTimeAvg', self.disk_io_time_avg) - self.set_optional(obj, 'diskIoTimeLast', self.disk_io_time_last) - self.set_optional(obj, 'diskIoTimeMax', self.disk_io_time_max) - self.set_optional(obj, 'diskIoTimeMin', self.disk_io_time_min) - self.set_optional(obj, 'diskMergedReadAvg', self.disk_merged_read_avg) - self.set_optional(obj, 'diskMergedReadLast', self.disk_merged_read_last) - self.set_optional(obj, 'diskMergedReadMax', self.disk_merged_read_max) - self.set_optional(obj, 'diskMergedReadMin', self.disk_merged_read_min) - self.set_optional(obj, 'diskMergedWriteAvg', self.disk_merged_write_avg) - self.set_optional(obj, 'diskMergedWriteLast', self.disk_merged_write_last) - self.set_optional(obj, 'diskMergedWriteMax', self.disk_merged_write_max) - self.set_optional(obj, 'diskMergedWriteMin', self.disk_merged_write_min) - self.set_optional(obj, 'diskOctetsReadAvg', self.disk_octets_read_avg) - self.set_optional(obj, 'diskOctetsReadLast', self.disk_octets_read_last) - self.set_optional(obj, 'diskOctetsReadMax', self.disk_octets_read_max) - self.set_optional(obj, 'diskOctetsReadMin', self.disk_octets_read_min) - self.set_optional(obj, 'diskOctetsWriteAvg', self.disk_octets_write_avg) - self.set_optional(obj, 'diskOctetsWriteLast', self.disk_octets_write_last) - self.set_optional(obj, 'diskOctetsWriteMax', self.disk_octets_write_max) - self.set_optional(obj, 'diskOctetsWriteMin', self.disk_octets_write_min) - self.set_optional(obj, 'diskOpsReadAvg', self.disk_ops_read_avg) - self.set_optional(obj, 'diskOpsReadLast', self.disk_ops_read_last) - self.set_optional(obj, 'diskOpsReadMax', self.disk_ops_read_max) - self.set_optional(obj, 'diskOpsReadMin', self.disk_ops_read_min) - self.set_optional(obj, 'diskOpsWriteAvg', self.disk_ops_write_avg) - self.set_optional(obj, 'diskOpsWriteLast', self.disk_ops_write_last) - self.set_optional(obj, 'diskOpsWriteMax', self.disk_ops_write_max) - self.set_optional(obj, 'diskOpsWriteMin', self.disk_ops_write_min) - self.set_optional(obj, 'diskPendingOperationsAvg', self.disk_pending_operations_avg) - self.set_optional(obj, 'diskPendingOperationsLast', self.disk_pending_operations_last) - self.set_optional(obj, 'diskPendingOperationsMax', self.disk_pending_operations_max) - self.set_optional(obj, 'diskPendingOperationsMin', self.disk_pending_operations_min) - self.set_optional(obj, 'diskTimeReadAvg', self.disk_time_read_avg) - self.set_optional(obj, 'diskTimeReadLast', self.disk_time_read_last) - self.set_optional(obj, 'diskTimeReadMax', self.disk_time_read_max) - self.set_optional(obj, 'diskTimeReadMin', self.disk_time_read_min) - self.set_optional(obj, 'diskTimeWriteAvg', self.disk_time_write_avg) - self.set_optional(obj, 'diskTimeWriteLast', self.disk_time_write_last) - self.set_optional(obj, 'diskTimeWriteMax', self.disk_time_write_max) - self.set_optional(obj, 'diskTimeWriteMin', self.disk_time_write_min) - return obj - - -class VNicPerformance(VESDataType): - """vNicPerformance datatype""" - - def __init__(self, identifier): - self.received_broadcast_packets_accumulated = None - self.received_broadcast_packets_delta = None - self.received_discarded_packets_accumulated = None - self.received_discarded_packets_delta = None - self.received_error_packets_accumulated = None - self.received_error_packets_delta = None - self.received_multicast_packets_accumulated = None - self.received_multicast_packets_delta = None - self.received_octets_accumulated = None - self.received_octets_delta = None - self.received_total_packets_accumulated = None - self.received_total_packets_delta = None - self.received_unicast_packets_accumulated = None - self.received_unicast_packets_delta = None - self.transmitted_broadcast_packets_accumulated = None - self.transmitted_broadcast_packets_delta = None - self.transmitted_discarded_packets_accumulated = None - self.transmitted_discarded_packets_delta = None - self.transmitted_error_packets_accumulated = None - self.transmitted_error_packets_delta = None - self.transmitted_multicast_packets_accumulated = None - self.transmitted_multicast_packets_delta = None - self.transmitted_octets_accumulated = None - self.transmitted_octets_delta = None - self.transmitted_total_packets_accumulated = None - self.transmitted_total_packets_delta = None - self.transmitted_unicast_packets_accumulated = None - self.transmitted_unicast_packets_delta = None - self.values_are_suspect = 'true' - self.v_nic_identifier = identifier - - def get_obj(self): - obj = { - # required - 'valuesAreSuspect': self.values_are_suspect, - 'vNicIdentifier': self.v_nic_identifier - } - # optional - self.set_optional(obj, 'receivedBroadcastPacketsAccumulated', self.received_broadcast_packets_accumulated) - self.set_optional(obj, 'receivedBroadcastPacketsDelta', self.received_broadcast_packets_delta) - self.set_optional(obj, 'receivedDiscardedPacketsAccumulated', self.received_discarded_packets_accumulated) - self.set_optional(obj, 'receivedDiscardedPacketsDelta', self.received_discarded_packets_delta) - self.set_optional(obj, 'receivedErrorPacketsAccumulated', self.received_error_packets_accumulated) - self.set_optional(obj, 'receivedErrorPacketsDelta', self.received_error_packets_delta) - self.set_optional(obj, 'receivedMulticastPacketsAccumulated', self.received_multicast_packets_accumulated) - self.set_optional(obj, 'receivedMulticastPacketsDelta', self.received_multicast_packets_delta) - self.set_optional(obj, 'receivedOctetsAccumulated', self.received_octets_accumulated) - self.set_optional(obj, 'receivedOctetsDelta', self.received_octets_delta) - self.set_optional(obj, 'receivedTotalPacketsAccumulated', self.received_total_packets_accumulated) - self.set_optional(obj, 'receivedTotalPacketsDelta', self.received_total_packets_delta) - self.set_optional(obj, 'receivedUnicastPacketsAccumulated', self.received_unicast_packets_accumulated) - self.set_optional(obj, 'receivedUnicastPacketsDelta', self.received_unicast_packets_delta) - self.set_optional(obj, 'transmittedBroadcastPacketsAccumulated', self.transmitted_broadcast_packets_accumulated) - self.set_optional(obj, 'transmittedBroadcastPacketsDelta', self.transmitted_broadcast_packets_delta) - self.set_optional(obj, 'transmittedDiscardedPacketsAccumulated', self.transmitted_discarded_packets_accumulated) - self.set_optional(obj, 'transmittedDiscardedPacketsDelta', self.transmitted_discarded_packets_delta) - self.set_optional(obj, 'transmittedErrorPacketsAccumulated', self.transmitted_error_packets_accumulated) - self.set_optional(obj, 'transmittedErrorPacketsDelta', self.transmitted_error_packets_delta) - self.set_optional(obj, 'transmittedMulticastPacketsAccumulated', self.transmitted_multicast_packets_accumulated) - self.set_optional(obj, 'transmittedMulticastPacketsDelta', self.transmitted_multicast_packets_delta) - self.set_optional(obj, 'transmittedOctetsAccumulated', self.transmitted_octets_accumulated) - self.set_optional(obj, 'transmittedOctetsDelta', self.transmitted_octets_delta) - self.set_optional(obj, 'transmittedTotalPacketsAccumulated', self.transmitted_total_packets_accumulated) - self.set_optional(obj, 'transmittedTotalPacketsDelta', self.transmitted_total_packets_delta) - self.set_optional(obj, 'transmittedUnicastPacketsAccumulated', self.transmitted_unicast_packets_accumulated) - self.set_optional(obj, 'transmittedUnicastPacketsDelta', self.transmitted_unicast_packets_delta) - return obj - - -class CpuUsage(VESDataType): - """cpuUsage datatype""" - - def __init__(self, identifier): - self.cpu_identifier = identifier - self.cpu_idle = None - self.cpu_usage_interrupt = None - self.cpu_usage_nice = None - self.cpu_usage_soft_irq = None - self.cpu_usage_steal = None - self.cpu_usage_system = None - self.cpu_usage_user = None - self.cpu_wait = None - self.percent_usage = 0 - - def get_obj(self): - obj = { - # required - 'cpuIdentifier': self.cpu_identifier, - 'percentUsage': self.percent_usage - } - # optional - self.set_optional(obj, 'cpuIdle', self.cpu_idle) - self.set_optional(obj, 'cpuUsageInterrupt', self.cpu_usage_interrupt) - self.set_optional(obj, 'cpuUsageNice', self.cpu_usage_nice) - self.set_optional(obj, 'cpuUsageSoftIrq', self.cpu_usage_soft_irq) - self.set_optional(obj, 'cpuUsageSteal', self.cpu_usage_steal) - self.set_optional(obj, 'cpuUsageSystem', self.cpu_usage_system) - self.set_optional(obj, 'cpuUsageUser', self.cpu_usage_user) - self.set_optional(obj, 'cpuWait', self.cpu_wait) - return obj - - -class MemoryUsage(VESDataType): - """memoryUsage datatype""" - - def __init__(self, identifier): - self.memory_buffered = None - self.memory_cached = None - self.memory_configured = None - self.memory_free = None - self.memory_slab_recl = None - self.memory_slab_unrecl = None - self.memory_used = None - self.vm_identifier = identifier - - def __str__(self): - """ for debug purposes """ - return 'vm_identifier : {vm_identifier}\nbuffered : {buffered}\n' \ - 'cached : {cached}\nconfigured : {configured}\nfree : {free}\n' \ - 'slab_recl : {slab_recl}\nslab_unrecl : {slab_unrecl}\n' \ - 'used : {used}\n'.format(buffered=self.memory_buffered, - cached=self.memory_cached, configured=self.memory_configured, - free=self.memory_free, slab_recl=self.memory_slab_recl, - slab_unrecl=self.memory_slab_unrecl, used=self.memory_used, - vm_identifier=self.vm_identifier) - - def get_memory_free(self): - if self.memory_free is None: - # calculate the free memory - if None not in (self.memory_configured, self.memory_used): - return self.memory_configured - self.memory_used - else: - # required field, so return zero - return 0 - else: - return self.memory_free - - def get_memory_used(self): - if self.memory_used is None: - # calculate the memory used - if None not in (self.memory_configured, self.memory_free, self.memory_buffered, - self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl): - return self.memory_configured - (self.memory_free + - self.memory_buffered + self.memory_cached + - self.memory_slab_recl + self.memory_slab_unrecl) - else: - # required field, so return zero - return 0 - else: - return self.memory_used - - def get_memory_total(self): - if self.memory_configured is None: - # calculate the total memory - if None not in (self.memory_used, self.memory_free, self.memory_buffered, - self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl): - return (self.memory_used + self.memory_free + - self.memory_buffered + self.memory_cached + - self.memory_slab_recl + self.memory_slab_unrecl) - else: - return None - else: - return self.memory_configured - - def get_obj(self): - obj = { - # required fields - 'memoryFree': self.get_memory_free(), - 'memoryUsed': self.get_memory_used(), - 'vmIdentifier': self.vm_identifier - } - # optional fields - self.set_optional(obj, 'memoryBuffered', self.memory_buffered) - self.set_optional(obj, 'memoryCached', self.memory_cached) - self.set_optional(obj, 'memoryConfigured', self.memory_configured) - self.set_optional(obj, 'memorySlabRecl', self.memory_slab_recl) - self.set_optional(obj, 'memorySlabUnrecl', self.memory_slab_unrecl) - return obj - - -class MeasurementsForVfScaling(Event): - """MeasurementsForVfScaling datatype""" - - def __init__(self, event_id): - """Construct the header""" - super(MeasurementsForVfScaling, self).__init__() - # common attributes - self.domain = "measurementsForVfScaling" - self.event_type = 'hostOS' - self.event_id = event_id - # measurement attributes - self.additional_measurements = [] - self.codec_usage_array = [] - self.concurrent_sessions = 0 - self.configured_entities = 0 - self.cpu_usage_array = [] - self.feature_usage_array = [] - self.filesystem_usage_array = [] - self.latency_distribution = [] - self.mean_request_latency = 0 - self.measurement_interval = 0 - self.number_of_media_ports_in_use = 0 - self.request_rate = 0 - self.vnfc_scaling_metric = 0 - self.additional_fields = [] - self.additional_objects = [] - self.disk_usage_array = [] - self.measurements_for_vf_scaling_version = 2.0 - self.memory_usage_array = [] - self.v_nic_performance_array = [] - - def add_additional_measurement(self, named_array): - self.additional_measurements.append(named_array.get_obj()) - - def add_additional_fields(self, field): - self.additional_fields.append(field.get_obj()) - - def add_memory_usage(self, mem_usage): - self.memory_usage_array.append(mem_usage.get_obj()) - - def add_cpu_usage(self, cpu_usage): - self.cpu_usage_array.append(cpu_usage.get_obj()) - - def add_v_nic_performance(self, nic_performance): - self.v_nic_performance_array.append(nic_performance.get_obj()) - - def add_disk_usage(self, disk_usage): - self.disk_usage_array.append(disk_usage.get_obj()) - - def get_obj(self): - """Get the object of the datatype""" - obj = {} - obj['additionalMeasurements'] = self.additional_measurements - obj['codecUsageArray'] = self.codec_usage_array - obj['concurrentSessions'] = self.concurrent_sessions - obj['configuredEntities'] = self.configured_entities - obj['cpuUsageArray'] = self.cpu_usage_array - obj['featureUsageArray'] = self.feature_usage_array - obj['filesystemUsageArray'] = self.filesystem_usage_array - obj['latencyDistribution'] = self.latency_distribution - obj['meanRequestLatency'] = self.mean_request_latency - obj['measurementInterval'] = self.measurement_interval - obj['numberOfMediaPortsInUse'] = self.number_of_media_ports_in_use - obj['requestRate'] = self.request_rate - obj['vnfcScalingMetric'] = self.vnfc_scaling_metric - obj['additionalFields'] = self.additional_fields - obj['additionalObjects'] = self.additional_objects - obj['diskUsageArray'] = self.disk_usage_array - obj['measurementsForVfScalingVersion'] = self.measurements_for_vf_scaling_version - obj['memoryUsageArray'] = self.memory_usage_array - obj['vNicPerformanceArray'] = self.v_nic_performance_array - return obj - - def get_name(self): - """Name of datatype""" - return "measurementsForVfScalingFields" - - -class Fault(Event): - """Fault datatype""" - - def __init__(self, event_id): - """Construct the header""" - super(Fault, self).__init__() - # common attributes - self.domain = "fault" - self.event_id = event_id - self.event_type = "Fault" - # fault attributes - self.fault_fields_version = 1.1 - self.event_severity = 'NORMAL' - self.event_source_type = 'other(0)' - self.alarm_condition = '' - self.specific_problem = '' - self.vf_status = 'Active' - self.alarm_interface_a = '' - self.alarm_additional_information = [] - self.event_category = "" - - def get_name(self): - """Name of datatype""" - return 'faultFields' - - def get_obj(self): - """Get the object of the datatype""" - obj = {} - obj['faultFieldsVersion'] = self.fault_fields_version - obj['eventSeverity'] = self.event_severity - obj['eventSourceType'] = self.event_source_type - obj['alarmCondition'] = self.alarm_condition - obj['specificProblem'] = self.specific_problem - obj['vfStatus'] = self.vf_status - obj['alarmInterfaceA'] = self.alarm_interface_a - obj['alarmAdditionalInformation'] = self.alarm_additional_information - obj['eventCategory'] = self.event_category - return obj - - -class VESApp(object): - """VES Application""" - - def __init__(self): - """Application initialization""" - self.__plugin_data_cache = { - 'cpu': {'interval': 0.0, 'vls': []}, - 'virt': {'interval': 0.0, 'vls': []}, - 'disk': {'interval': 0.0, 'vls': []}, - 'interface': {'interval': 0.0, 'vls': []}, - 'memory': {'interval': 0.0, 'vls': []} - } - self.__app_config = { - 'Domain': '127.0.0.1', - 'Port': 30000, - 'Path': '', - 'Username': '', - 'Password': '', - 'Topic': '', - 'UseHttps': False, - 'SendEventInterval': 20.0, - 'FunctionalRole': 'Collectd VES Agent', - 'ApiVersion': 5.1, - 'KafkaPort': 9092, - 'KafkaBroker': 'localhost' - } - self.__host_name = None - self.__ves_timer = None - self.__lock = Lock() - self.__event_id = 0 - - def get_event_id(self): - """get event id""" - self.__event_id += 1 - return str(self.__event_id) - - def lock(self): - """Lock the application""" - self.__lock.acquire() - - def unlock(self): - """Unlock the application""" - self.__lock.release() - - def start_timer(self): - """Start event timer""" - self.__ves_timer = Timer(self.__app_config['SendEventInterval'], self.__on_time) - self.__ves_timer.start() - - def stop_timer(self): - """Stop event timer""" - self.__ves_timer.cancel() - - def __on_time(self): - """Timer thread""" - self.event_timer() - self.start_timer() - - def event_send(self, event): - """Send event to VES""" - server_url = "http{}://{}:{}{}/eventListener/v{}{}".format( - 's' if self.__app_config['UseHttps'] else '', self.__app_config['Domain'], - int(self.__app_config['Port']), '{}'.format( - '/{}'.format(self.__app_config['Path']) if (len(self.__app_config['Path']) > 0) else ''), - int(self.__app_config['ApiVersion']), '{}'.format( - '/{}'.format(self.__app_config['Topic']) if (len(self.__app_config['Topic']) > 0) else '')) - logging.info('Vendor Event Listener is at: {}'.format(server_url)) - credentials = base64.b64encode('{}:{}'.format( - self.__app_config['Username'], self.__app_config['Password']).encode()).decode() - logging.info('Authentication credentials are: {}'.format(credentials)) - try: - request = url.Request(server_url) - request.add_header('Authorization', 'Basic {}'.format(credentials)) - request.add_header('Content-Type', 'application/json') - logging.debug("Sending {} to {}".format(event.get_json(), server_url)) - vel = url.urlopen(request, event.get_json(), timeout=1) - logging.debug("Sent data to {} successfully".format(server_url)) - except url.HTTPError as e: - logging.error('Vendor Event Listener exception: {}'.format(e)) - except url.URLError as e: - logging.error('Vendor Event Listener is is not reachable: {}'.format(e)) - except Exception as e: - logging.error('Vendor Event Listener error: {}'.format(e)) - - def bytes_to_kb(self, bytes): - """Convert bytes to kibibytes""" - return round((bytes / 1024.0), 3) - - def get_hostname(self): - return socket.gethostname() - - def send_host_measurements(self): - # get list of all VMs - virt_vcpu_total = self.cache_get_value(plugin_name='virt', type_name='virt_cpu_total', - mark_as_read=False) - vm_names = [x['plugin_instance'] for x in virt_vcpu_total] - for vm_name in vm_names: - # make sure that 'virt' plugin cache is up-to-date - vm_values = self.cache_get_value(plugin_name='virt', plugin_instance=vm_name, - mark_as_read=False) - us_up_to_date = True - for vm_value in vm_values: - if vm_value['updated'] == False: - logging.warning("%s" % vm_value) - us_up_to_date = False - break - if not us_up_to_date: - # one of the cache value is not up-to-date, break - logging.warning("virt collectd cache values are not up-to-date for {}".format(vm_name)) - continue - # values are up-to-date, create an event message - measurement = MeasurementsForVfScaling(self.get_event_id()) - measurement.functional_role = self.__app_config['FunctionalRole'] - # fill out reporting_entity - measurement.reporting_entity_id = self.get_hostname() - measurement.reporting_entity_name = measurement.reporting_entity_id - # set source as a host value - measurement.source_id = vm_name - measurement.source_name = measurement.source_id - # fill out EpochMicrosec (convert to us) - measurement.start_epoch_microsec = (virt_vcpu_total[0]['time'] * 1000000) - # plugin interval - measurement.measurement_interval = self.__plugin_data_cache['virt']['interval'] - # memoryUsage - mem_usage = MemoryUsage(vm_name) - memory_total = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='memory', type_instance='total') - memory_unused = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='memory', type_instance='unused') - memory_rss = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='memory', type_instance='rss') - if len(memory_total) > 0: - mem_usage.memory_configured = self.bytes_to_kb(memory_total[0]['values'][0]) - if len(memory_unused) > 0: - mem_usage.memory_free = self.bytes_to_kb(memory_unused[0]['values'][0]) - elif len(memory_rss) > 0: - mem_usage.memory_free = self.bytes_to_kb(memory_rss[0]['values'][0]) - # since, "used" metric is not provided by virt plugn, set the rest of the memory stats - # to zero to calculate used based on provided stats only - mem_usage.memory_buffered = mem_usage.memory_cached = mem_usage.memory_slab_recl = \ - mem_usage.memory_slab_unrecl = 0 - measurement.add_memory_usage(mem_usage) - # cpuUsage - virt_vcpus = self.cache_get_value(plugin_instance=vm_name, - plugin_name='virt', type_name='virt_vcpu') - for virt_vcpu in virt_vcpus: - cpu_usage = CpuUsage(virt_vcpu['type_instance']) - cpu_usage.percent_usage = self.cpu_ns_to_percentage(virt_vcpu) - measurement.add_cpu_usage(cpu_usage) - # vNicPerformance - if_packets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='if_packets', mark_as_read=False) - if_names = [x['type_instance'] for x in if_packets] - for if_name in if_names: - if_packets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='if_packets', type_instance=if_name) - if_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='if_octets', type_instance=if_name) - if_errors = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='if_errors', type_instance=if_name) - if_dropped = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='if_dropped', type_instance=if_name) - v_nic_performance = VNicPerformance(if_name) - v_nic_performance.received_total_packets_accumulated = if_packets[0]['values'][0] - v_nic_performance.transmitted_total_packets_accumulated = if_packets[0]['values'][1] - v_nic_performance.received_octets_accumulated = if_octets[0]['values'][0] - v_nic_performance.transmitted_octets_accumulated = if_octets[0]['values'][1] - v_nic_performance.received_error_packets_accumulated = if_errors[0]['values'][0] - v_nic_performance.transmitted_error_packets_accumulated = if_errors[0]['values'][1] - v_nic_performance.received_discarded_packets_accumulated = if_dropped[0]['values'][0] - v_nic_performance.transmitted_discarded_packets_accumulated = if_dropped[0]['values'][1] - measurement.add_v_nic_performance(v_nic_performance) - # diskUsage - disk_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='disk_octets', mark_as_read=False) - disk_names = [x['type_instance'] for x in disk_octets] - for disk_name in disk_names: - disk_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='disk_octets', type_instance=disk_name) - disk_ops = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='disk_ops', type_instance=disk_name) - disk_usage = DiskUsage(disk_name) - disk_usage.disk_octets_read_last = disk_octets[0]['values'][0] - disk_usage.disk_octets_write_last = disk_octets[0]['values'][1] - disk_usage.disk_ops_read_last = disk_ops[0]['values'][0] - disk_usage.disk_ops_write_last = disk_ops[0]['values'][1] - measurement.add_disk_usage(disk_usage) - # add additional measurements (perf) - perf_values = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt', - type_name='perf') - named_array = NamedArrayOfFields('perf') - for perf in perf_values: - named_array.add(Field(perf['type_instance'], str(perf['values'][0]))) - measurement.add_additional_measurement(named_array) - # add host values as additional measurements - self.set_additional_fields(measurement, exclude_plugins=['virt']) - # send event to the VES - self.event_send(measurement) - if len(vm_names) > 0: - # mark the additional measurements metrics as read - self.mark_cache_values_as_read(exclude_plugins=['virt']) - - def event_timer(self): - """Event timer thread""" - self.lock() - try: - self.send_host_measurements() - finally: - self.unlock() - - def mark_cache_values_as_read(self, exclude_plugins=None): - """mark the cache values as read""" - for plugin_name in self.__plugin_data_cache.keys(): - if (exclude_plugins != None and plugin_name in exclude_plugins): - # skip excluded plugins - continue; - for val in self.__plugin_data_cache[plugin_name]['vls']: - val['updated'] = False - - def set_additional_measurements(self, measurement, exclude_plugins=None): - """Set addition measurement filed with host/guets values""" - # add host/guest values as additional measurements - for plugin_name in self.__plugin_data_cache.keys(): - if (exclude_plugins != None and plugin_name in exclude_plugins): - # skip excluded plugins - continue; - for val in self.__plugin_data_cache[plugin_name]['vls']: - if val['updated']: - array_name = self.make_dash_string(plugin_name, val['plugin_instance'], - val['type_instance']) - named_array = NamedArrayOfFields(array_name) - - for index in range(len(val['dsnames'])): - mname = '{}-{}'.format(val['type'], val['dsnames'][index]) - named_array.add(Field(mname, str(val['values'][index]))) - measurement.add_additional_measurement(named_array); - val['updated'] = False - - def set_additional_fields(self, measurement, exclude_plugins=None): - # set host values as additional fields - for plugin_name in self.__plugin_data_cache.keys(): - if (exclude_plugins != None and plugin_name in exclude_plugins): - # skip excluded plugins - continue; - for val in self.__plugin_data_cache[plugin_name]['vls']: - if val['updated']: - name_prefix = self.make_dash_string(plugin_name, val['plugin_instance'], - val['type_instance']) - - for index in range(len(val['dsnames'])): - field_name = self.make_dash_string(name_prefix, val['type'], val['dsnames'][index]) - measurement.add_additional_fields(Field(field_name, str(val['values'][index]))) - - def cpu_ns_to_percentage(self, vl): - """Convert CPU usage ns to CPU %""" - total = vl['values'][0] - total_time = vl['time'] - pre_total = vl['pre_values'][0] - pre_total_time = vl['pre_time'] - if (total_time - pre_total_time) == 0: - # return zero usage if time diff is zero - return 0.0 - percent = (100.0 * (total - pre_total)) / ((total_time - pre_total_time) * 1000000000.0) - logging.debug("pre_time={}, pre_value={}, time={}, value={}, cpu={}%".format( - pre_total_time, pre_total, total_time, total, round(percent, 2))) - return round(percent, 2) - - def make_dash_string(self, *args): - """Join non empty strings with dash symbol""" - return '-'.join(filter(lambda x: len(x) > 0, args)) - - def config(self, config): - """VES option configuration""" - for key, value in config.items('config'): - if key in self.__app_config: - try: - if type(self.__app_config[key]) == int: - value = int(value) - elif type(self.__app_config[key]) == float: - value = float(value) - elif type(self.__app_config[key]) == bool: - value = bool(distutils.util.strtobool(value)) - - if isinstance(value, type(self.__app_config[key])): - self.__app_config[key] = value - else: - logging.error("Type mismatch with %s" % key) - sys.exit() - except ValueError: - logging.error("Incorrect value type for %s" % key) - sys.exit() - else: - logging.error("Incorrect key configuration %s" % key) - sys.exit() - - def init(self): - """Initialisation""" - # start the VES timer - self.start_timer() - - def update_cache_value(self, kafka_data): - """Update value internal collectd cache values or create new one - Please note, the cache should be locked before using this function""" - found = False - if kafka_data['plugin'] not in self.__plugin_data_cache: - self.__plugin_data_cache[kafka_data['plugin']] = {'vls': []} - plugin_vl = self.__plugin_data_cache[kafka_data['plugin']]['vls'] - - for index in range(len(plugin_vl)): - # record found, so just update time the values - if (plugin_vl[index]['plugin_instance'] == kafka_data['plugin_instance']) and \ - (plugin_vl[index]['type_instance'] == kafka_data['type_instance']) and \ - (plugin_vl[index]['type'] == kafka_data['type']): - plugin_vl[index]['pre_time'] = plugin_vl[index]['time'] - plugin_vl[index]['time'] = kafka_data['time'] - plugin_vl[index]['pre_values'] = plugin_vl[index]['values'] - plugin_vl[index]['values'] = kafka_data['values'] - plugin_vl[index]['dsnames'] = kafka_data['dsnames'] - plugin_vl[index]['updated'] = True - found = True - break - if not found: - value = {} - # create new cache record - value['plugin_instance'] = kafka_data['plugin_instance'] - value['type_instance'] = kafka_data['type_instance'] - value['values'] = kafka_data['values'] - value['pre_values'] = kafka_data['values'] - value['type'] = kafka_data['type'] - value['time'] = kafka_data['time'] - value['pre_time'] = kafka_data['time'] - value['host'] = kafka_data['host'] - value['dsnames'] = kafka_data['dsnames'] - value['updated'] = True - self.__plugin_data_cache[kafka_data['plugin']]['vls'].append(value) - # update plugin interval based on one received in the value - self.__plugin_data_cache[kafka_data['plugin']]['interval'] = kafka_data['interval'] - - def cache_get_value(self, plugin_name=None, plugin_instance=None, - type_name=None, type_instance=None, type_names=None, mark_as_read=True): - """Get cache value by given criteria""" - ret_list = [] - if plugin_name in self.__plugin_data_cache: - for val in self.__plugin_data_cache[plugin_name]['vls']: - if (type_name == None or type_name == val['type']) and \ - (plugin_instance == None or plugin_instance == val['plugin_instance']) and \ - (type_instance == None or type_instance == val['type_instance']) and \ - (type_names == None or val['type'] in type_names): - if mark_as_read: - val['updated'] = False - ret_list.append(val) - return ret_list - - def shutdown(self): - """Shutdown method for clean up""" - self.stop_timer() - - def run(self): - """Consumer JSON data from kafka broker""" - kafka_server = '%s:%s' % (self.__app_config.get('KafkaBroker'), self.__app_config.get('KafkaPort')) - consumer = KafkaConsumer('collectd', bootstrap_servers=kafka_server, auto_offset_reset='latest', - enable_auto_commit=False, - value_deserializer=lambda m: json.loads(m.decode('ascii'))) - - for message in consumer: - for kafka_data in message.value: - self.lock() - try: - # Receive Kafka data and update cache values - self.update_cache_value(kafka_data) - - finally: - self.unlock() - - -def main(): - # Parsing cmdline options - parser = argparse.ArgumentParser() - parser.add_argument("--config", dest="configfile", default=None, help="Specify config file", metavar="FILE") - parser.add_argument("--loglevel", dest="level", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='WARNING', - help="Specify log level (default: %(default)s)", metavar="LEVEL") - parser.add_argument("--logfile", dest="logfile", default='ves_plugin.log', - help="Specify log file (default: %(default)s)", metavar="FILE") - args = parser.parse_args() - - # Create log file - logging.basicConfig(filename=args.logfile, format='%(asctime)s %(message)s', level=args.level) - - # Create Application Instance - application_instance = VESApp() - - # Read from config file - if args.configfile: - config = ConfigParser.ConfigParser() - config.optionxform = lambda option: option - config.read(args.configfile) - # Write Config Values - application_instance.config(config) - else: - logging.warning("No configfile specified, using default options") - - # Start timer for interval - application_instance.init() - - # Read Data from Kakfa - try: - # Kafka consumer & update cache - application_instance.run() - except KeyboardInterrupt: - logging.info(" - Ctrl-C handled, exiting gracefully") - application_instance.shutdown() - sys.exit() - except: - logging.error("Unknown Error") - - -if __name__ == '__main__': - main() diff --git a/baro_tests/collectd.py b/baro_tests/collectd.py index a002314e..304b87b8 100644 --- a/baro_tests/collectd.py +++ b/baro_tests/collectd.py @@ -11,6 +11,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +# Patch on October 10 2017 """Executing test of plugins""" @@ -22,7 +23,6 @@ import time import logging import config_server import tests -import subprocess from opnfv.deployment import factory AODH_NAME = 'aodh' @@ -30,7 +30,7 @@ GNOCCHI_NAME = 'gnocchi' ID_RSA_SRC = '/root/.ssh/id_rsa' ID_RSA_DST_DIR = '/root/.ssh' ID_RSA_DST = ID_RSA_DST_DIR + '/id_rsa' -APEX_IP = subprocess.check_output("echo $INSTALLER_IP", shell=True) +APEX_IP = os.getenv("INSTALLER_IP").rstrip('\n') APEX_USER = 'root' APEX_USER_STACK = 'stack' APEX_PKEY = '/root/.ssh/id_rsa' @@ -173,31 +173,6 @@ class AodhClient(object): logger.warning('Aodh is not registered in service catalog') -class SNMPClient(object): - """Client to request SNMP meters""" - def __init__(self, conf, compute_node): - """ - Keyword arguments: - conf -- ConfigServer instance - compute_node -- Compute node object - """ - self.conf = conf - self.compute_node = compute_node - - def get_snmp_metrics(self, compute_node, mib_file, mib_strings): - snmp_output = {} - if mib_file is None: - cmd = "snmpwalk -v 2c -c public localhost IF-MIB::interfaces" - ip = compute_node.get_ip() - snmp_output = self.conf.execute_command(cmd, ip) - else: - for mib_string in mib_strings: - snmp_output[mib_string] = self.conf.execute_command( - "snmpwalk -v2c -m {} -c public localhost {}".format( - mib_file, mib_string), compute_node.get_ip()) - return snmp_output - - class CSVClient(object): """Client to request CSV meters""" def __init__(self, conf): @@ -224,31 +199,39 @@ class CSVClient(object): if compute_name == node.get_dict()['name']: date = node.run_cmd( "date '+%Y-%m-%d'") + hostname = node.run_cmd('hostname -A') + hostname = hostname.split()[0] metrics = [] for plugin_subdir in plugin_subdirectories: for meter_category in meter_categories: stdout1 = node.run_cmd( "tail -2 /var/lib/collectd/csv/" - + "{0}.jf.intel.com/{1}/{2}-{3}".format( - compute_node.get_name(), plugin_subdir, + + "{0}/{1}/{2}-{3}".format( + hostname, plugin_subdir, meter_category, date)) stdout2 = node.run_cmd( "tail -1 /var/lib/collectd/csv/" - + "{0}.jf.intel.com/{1}/{2}-{3}".format( - compute_node.get_name(), plugin_subdir, + + "{0}/{1}/{2}-{3}".format( + hostname, plugin_subdir, meter_category, date)) - # Storing last two values + # Storing last two values values = stdout1 + values2 = stdout2 if values is None: logger.error( 'Getting last two CSV entries of meter category' + ' {0} in {1} subdir failed'.format( meter_category, plugin_subdir)) + elif values2 is None: + logger.error( + 'Getting last CSV entries of meter category' + + ' {0} in {1} subdir failed'.format( + meter_category, plugin_subdir)) else: values = values.split(',') old_value = float(values[0]) - stdout2 = stdout2.split(',') - new_value = float(stdout2[0]) + values2 = values2.split(',') + new_value = float(values2[0]) metrics.append(( plugin_subdir, meter_category, old_value, new_value)) @@ -272,7 +255,7 @@ def get_csv_categories_for_ipmi(conf, compute_node): return [category.strip()[:-11] for category in categories] -def _process_result(compute_node, test, result, results_list): +def _process_result(compute_node, out_plugin, test, result, results_list, node): """Print test result and append it to results list. Keyword arguments: @@ -282,13 +265,13 @@ def _process_result(compute_node, test, result, results_list): """ if result: logger.info( - 'Compute node {0} test case {1} PASSED.'.format( - compute_node, test)) + 'Test case for {0} with {1} PASSED on {2}.'.format( + node, out_plugin, test)) else: logger.error( - 'Compute node {0} test case {1} FAILED.'.format( - compute_node, test)) - results_list.append((compute_node, test, result)) + 'Test case for {0} with {1} FAILED on {2}.'.format( + node, out_plugin, test)) + results_list.append((compute_node, out_plugin, test, result)) def _print_label(label): @@ -333,22 +316,41 @@ def _print_final_result_of_plugin( """ print_line = '' for id in compute_ids: - if out_plugins[id] == out_plugin: - if (id, plugin, True) in results: + if out_plugin == 'Gnocchi': + if (id, out_plugin, plugin, True) in results: print_line += ' PASS |' - elif (id, plugin, False) in results \ - and out_plugins[id] == out_plugin: + elif (id, out_plugin, plugin, False) in results: + print_line += ' FAIL |' + else: + print_line += ' NOT EX |' + elif out_plugin == 'AODH': + if (id, out_plugin, plugin, True) in results: + print_line += ' PASS |' + elif (id, out_plugin, plugin, False) in results: + print_line += ' FAIL |' + else: + print_line += ' FAIL |' + elif out_plugin == 'SNMP': + if (id, out_plugin, plugin, True) in results: + print_line += ' PASS |' + elif (id, out_plugin, plugin, False) in results: + print_line += ' FAIL |' + else: + print_line += ' FAIL |' + elif out_plugin == 'CSV': + if (id, out_plugin, plugin, True) in results: + print_line += ' PASS |' + elif (id, out_plugin, plugin, False) in results: print_line += ' FAIL |' else: print_line += ' NOT EX |' - elif out_plugin == 'Gnocchi': - print_line += ' NOT EX |' else: - print_line += ' NOT EX |' + print_line += ' SKIP |' return print_line -def print_overall_summary(compute_ids, tested_plugins, results, out_plugins): +def print_overall_summary( + compute_ids, tested_plugins, aodh_plugins, results, out_plugins): """Print overall summary table. Keyword arguments: @@ -359,7 +361,6 @@ def print_overall_summary(compute_ids, tested_plugins, results, out_plugins): """ compute_node_names = ['Node-{}'.format(i) for i in range( len((compute_ids)))] - # compute_node_names = ['Node-{}'.format(id) for id in compute_ids] all_computes_in_line = '' for compute in compute_node_names: all_computes_in_line += '| ' + compute + (' ' * (7 - len(compute))) @@ -377,46 +378,60 @@ def print_overall_summary(compute_ids, tested_plugins, results, out_plugins): logger.info(line_of_nodes) logger.info( '+' + ('-' * 16) + '+' + (('-' * 8) + '+') * len(compute_node_names)) - out_plugins_print = ['Gnocchi'] - if 'SNMP' in out_plugins.values(): - out_plugins_print.append('SNMP') - if 'AODH' in out_plugins.values(): - out_plugins_print.append('AODH') - if 'CSV' in out_plugins.values(): - out_plugins_print.append('CSV') + out_plugins_print = [] + out_plugins_print1 = [] + for key in out_plugins.keys(): + if 'Gnocchi' in out_plugins[key]: + out_plugins_print1.append('Gnocchi') + if 'AODH' in out_plugins[key]: + out_plugins_print1.append('AODH') + if 'SNMP' in out_plugins[key]: + out_plugins_print1.append('SNMP') + if 'CSV' in out_plugins[key]: + out_plugins_print1.append('CSV') + for i in out_plugins_print1: + if i not in out_plugins_print: + out_plugins_print.append(i) for out_plugin in out_plugins_print: output_plugins_line = '' for id in compute_ids: - out_plugin_result = 'FAIL' + out_plugin_result = '----' if out_plugin == 'Gnocchi': out_plugin_result = \ - 'PASS' if out_plugins[id] == out_plugin else 'FAIL' - if out_plugin == 'AODH': - if out_plugins[id] == out_plugin: - out_plugin_result = \ - 'PASS' if out_plugins[id] == out_plugin else 'FAIL' - if out_plugin == 'SNMP': - if out_plugins[id] == out_plugin: - out_plugin_result = \ - 'PASS' if out_plugins[id] == out_plugin else 'FAIL' - if out_plugin == 'CSV': - if out_plugins[id] == out_plugin: - out_plugin_result = \ - 'PASS' if [ - plugin for comp_id, plugin, res in results - if comp_id == id and res] else 'FAIL' - else: - out_plugin_result = 'SKIP' + 'PASS' + elif out_plugin == 'AODH': + out_plugin_result = \ + 'PASS' + elif out_plugin == 'SNMP': + out_plugin_result = \ + 'PASS' + elif out_plugin == 'CSV': + out_plugin_result = \ + 'PASS' if [ + plugin for comp_id, out_pl, plugin, res in results + if comp_id == id and res] else 'FAIL' + else: + out_plugin_result = \ + 'FAIL' output_plugins_line += '| ' + out_plugin_result + ' ' logger.info( '| OUT:{}'.format(out_plugin) + (' ' * (11 - len(out_plugin))) + output_plugins_line + '|') - for plugin in sorted(tested_plugins.values()): - line_plugin = _print_final_result_of_plugin( - plugin, compute_ids, results, out_plugins, out_plugin) - logger.info( - '| IN:{}'.format(plugin) + (' ' * (11-len(plugin))) - + '|' + line_plugin) + + if out_plugin == 'AODH': + for plugin in sorted(aodh_plugins.values()): + line_plugin = _print_final_result_of_plugin( + plugin, compute_ids, results, out_plugins, out_plugin) + logger.info( + '| IN:{}'.format(plugin) + (' ' * (11-len(plugin))) + + '|' + line_plugin) + else: + for plugin in sorted(tested_plugins.values()): + line_plugin = _print_final_result_of_plugin( + plugin, compute_ids, results, out_plugins, out_plugin) + logger.info( + '| IN:{}'.format(plugin) + (' ' * (11-len(plugin))) + + '|' + line_plugin) logger.info( '+' + ('-' * 16) + '+' + (('-' * 8) + '+') * len(compute_node_names)) @@ -424,8 +439,8 @@ def print_overall_summary(compute_ids, tested_plugins, results, out_plugins): def _exec_testcase( - test_labels, name, gnocchi_running, aodh_running, snmp_running, - controllers, compute_node, conf, results, error_plugins, out_plugins): + test_labels, name, out_plugin, controllers, compute_node, + conf, results, error_plugins, out_plugins): """Execute the testcase. Keyword arguments: @@ -453,11 +468,8 @@ def _exec_testcase( bridge for bridge in ovs_interfaces if bridge in ovs_configured_bridges] plugin_prerequisites = { - 'intel_rdt': [( - conf.is_libpqos_on_node(compute_node), - 'libpqos must be installed.')], 'mcelog': [( - conf.is_installed(compute_node, 'mcelog'), + conf.is_mcelog_installed(compute_node, 'mcelog'), 'mcelog must be installed.')], 'ovs_events': [( len(ovs_existing_configured_int) > 0 or len(ovs_interfaces) > 0, @@ -466,28 +478,22 @@ def _exec_testcase( len(ovs_existing_configured_bridges) > 0, 'Bridges must be configured.')]} gnocchi_criteria_lists = { - 'hugepages': ['hugepages'], - 'mcelog': ['mcelog'], - 'ovs_events': ['interface-ovs-system'], - 'ovs_stats': ['ovs_stats-br0.br0']} + 'hugepages': 'hugepages', + 'intel_rdt': 'rdt', + 'mcelog': 'mcelog', + 'ovs_events': 'interface-ovs-system', + 'ovs_stats': 'ovs_stats-br0.br0'} aodh_criteria_lists = { - 'mcelog': ['mcelog.errors'], - 'ovs_events': ['ovs_events.gauge']} + 'mcelog': 'mcelog', + 'ovs_events': 'ovs_events'} snmp_mib_files = { 'intel_rdt': '/usr/share/snmp/mibs/Intel-Rdt.txt', 'hugepages': '/usr/share/snmp/mibs/Intel-Hugepages.txt', 'mcelog': '/usr/share/snmp/mibs/Intel-Mcelog.txt'} snmp_mib_strings = { - 'intel_rdt': [ - 'INTEL-RDT-MIB::rdtLlc.1', - 'INTEL-RDT-MIB::rdtIpc.1', - 'INTEL-RDT-MIB::rdtMbmRemote.1', - 'INTEL-RDT-MIB::rdtMbmLocal.1'], - 'hugepages': [ - 'INTEL-HUGEPAGES-MIB::hugepagesPageFree'], - 'mcelog': [ - 'INTEL-MCELOG-MIB::memoryCorrectedErrors.1', - 'INTEL-MCELOG-MIB::memoryCorrectedErrors.2']} + 'intel_rdt': 'INTEL-RDT-MIB::rdtLlc.1', + 'hugepages': 'INTEL-HUGEPAGES-MIB::hugepagesPageFree', + 'mcelog': 'INTEL-MCELOG-MIB::memoryCorrectedErrors.1'} nr_hugepages = int(time.time()) % 10000 snmp_in_commands = { 'intel_rdt': None, @@ -496,13 +502,10 @@ def _exec_testcase( 'mcelog': '/root/mce-inject_df < /root/corrected'} csv_subdirs = { 'intel_rdt': [ - 'intel_rdt-{}'.format(core) - for core in conf.get_plugin_config_values( - compute_node, 'intel_rdt', 'Cores')], + 'intel_rdt-0-2'], 'hugepages': [ 'hugepages-mm-2048Kb', 'hugepages-node0-2048Kb', - 'hugepages-node1-2048Kb', 'hugepages-mm-1048576Kb', - 'hugepages-node0-1048576Kb', 'hugepages-node1-1048576Kb'], + 'hugepages-node1-2048Kb'], # 'ipmi': ['ipmi'], 'mcelog': [ 'mcelog-SOCKET_0_CHANNEL_0_DIMM_any', @@ -515,19 +518,14 @@ def _exec_testcase( # compute_node) csv_meter_categories = { 'intel_rdt': [ - 'bytes-llc', 'ipc', 'memory_bandwidth-local', - 'memory_bandwidth-remote'], + 'bytes-llc', 'ipc'], 'hugepages': ['vmpage_number-free', 'vmpage_number-used'], # 'ipmi': csv_meter_categories_ipmi, 'mcelog': [ 'errors-corrected_memory_errors', - 'errors-uncorrected_memory_errors', - 'errors-corrected_memory_errors_in_24h', - 'errors-uncorrected_memory_errors_in_24h'], + 'errors-uncorrected_memory_errors'], 'ovs_stats': [ - 'if_collisions', 'if_dropped', 'if_errors', 'if_packets', - 'if_rx_errors-crc', 'if_rx_errors-frame', 'if_rx_errors-over', - 'if_rx_octets', 'if_tx_octets'], + 'if_dropped', 'if_errors', 'if_packets'], 'ovs_events': ['gauge-link_status']} _print_plugin_label( @@ -541,7 +539,8 @@ def _exec_testcase( for error in plugin_critical_errors: logger.error(' * ' + error) _process_result( - compute_node.get_id(), test_labels[name], False, results) + compute_node.get_id(), out_plugin, test_labels[name], False, + results, compute_node.get_name()) else: plugin_errors = [ error for plugin, error, critical in error_plugins @@ -563,35 +562,36 @@ def _exec_testcase( for prerequisite in failed_prerequisites: logger.error(' * {}'.format(prerequisite)) else: - if gnocchi_running: - plugin_interval = conf.get_plugin_interval(compute_node, name) + plugin_interval = conf.get_plugin_interval(compute_node, name) + if out_plugin == 'Gnocchi': res = conf.test_plugins_with_gnocchi( - compute_node.get_id(), plugin_interval, logger, - criteria_list=gnocchi_criteria_lists[name]) - elif aodh_running: + compute_node.get_name(), plugin_interval, + logger, criteria_list=gnocchi_criteria_lists[name]) + if out_plugin == 'AODH': res = conf.test_plugins_with_aodh( - compute_node.get_id(), plugin_interval, - logger, creteria_list=aodh_criteria_lists[name]) - elif snmp_running: + compute_node.get_name(), plugin_interval, + logger, criteria_list=aodh_criteria_lists[name]) + if out_plugin == 'SNMP': res = \ name in snmp_mib_files and name in snmp_mib_strings \ - and tests.test_snmp_sends_data( - compute_node, - conf.get_plugin_interval(compute_node, name), logger, - SNMPClient(conf, compute_node), snmp_mib_files[name], - snmp_mib_strings[name], snmp_in_commands[name], conf) - else: + and conf.test_plugins_with_snmp( + compute_node.get_name(), plugin_interval, logger, name, + snmp_mib_files[name], snmp_mib_strings[name], + snmp_in_commands[name]) + if out_plugin == 'CSV': res = tests.test_csv_handles_plugin_data( compute_node, conf.get_plugin_interval(compute_node, name), name, csv_subdirs[name], csv_meter_categories[name], logger, CSVClient(conf)) + if res and plugin_errors: logger.info( 'Test works, but will be reported as failure,' + 'because of non-critical errors.') res = False _process_result( - compute_node.get_id(), test_labels[name], res, results) + compute_node.get_id(), out_plugin, test_labels[name], + res, results, compute_node.get_name()) def get_results_for_ovs_events( @@ -614,48 +614,48 @@ def create_ovs_bridge(): APEX_USER_STACK, APEX_PKEY) nodes = handler.get_nodes() + logger.info("Creating OVS bridges on computes nodes") for node in nodes: if node.is_compute(): node.run_cmd('sudo ovs-vsctl add-br br0') node.run_cmd('sudo ovs-vsctl set-manager ptcp:6640') - logger.info('OVS Bridges created on compute nodes') + logger.info('OVS Bridges created on compute nodes') def mcelog_install(): """Install mcelog on compute nodes.""" - _print_label('Enabling mcelog on compute nodes') + _print_label('Enabling mcelog and OVS bridges on compute nodes') handler = factory.Factory.get_handler('apex', APEX_IP, APEX_USER_STACK, APEX_PKEY) nodes = handler.get_nodes() + mce_bin = os.path.dirname(os.path.realpath(__file__)) + '/mce-inject_ea' for node in nodes: if node.is_compute(): centos_release = node.run_cmd('uname -r') if '3.10.0-514.26.2.el7.x86_64' not in centos_release: logger.info( 'Mcelog will not be enabled ' - + 'on node-{0}, '.format(node.get_dict()['id']) + + 'on node-{0}, '.format(node.get_dict()['name']) + 'unsupported CentOS release found ({1}).'.format( centos_release)) else: logger.info( 'Checking if mcelog is enabled' - + ' on node-{}...'.format(node.get_dict()['id'])) + + ' on node-{}...'.format(node.get_dict()['name'])) res = node.run_cmd('ls') if 'mce-inject_ea' and 'corrected' in res: logger.info( 'Mcelog seems to be already installed ' - + 'on node-{}.'.format(node.get_dict()['id'])) + + 'on node-{}.'.format(node.get_dict()['name'])) node.run_cmd('sudo modprobe mce-inject') node.run_cmd('sudo ./mce-inject_ea < corrected') else: logger.info( 'Mcelog will be enabled on node-{}...'.format( node.get_dict()['id'])) - node.put_file( - '/usr/local/lib/python2.7/dist-packages/baro_tests/' - + 'mce-inject_ea', 'mce-inject_ea') + node.put_file(mce_bin, 'mce-inject_ea') node.run_cmd('chmod a+x mce-inject_ea') node.run_cmd('echo "CPU 0 BANK 0" > corrected') node.run_cmd( @@ -734,43 +734,24 @@ def main(bt_logger=None): _print_label( 'Display of Control and Compute nodes available in the set up') - logger.info('controllers: {}'.format([('{0}: {1} ({2})'.format( - node.get_id(), node.get_name(), - node.get_ip())) for node in controllers])) - logger.info('computes: {}'.format([('{0}: {1} ({2})'.format( - node.get_id(), node.get_name(), node.get_ip())) - for node in computes])) + logger.info('controllers: {}'.format([('{0}: {1}'.format( + node.get_name(), node.get_ip())) for node in controllers])) + logger.info('computes: {}'.format([('{0}: {1}'.format( + node.get_name(), node.get_ip())) for node in computes])) mcelog_install() create_ovs_bridge() gnocchi_running_on_con = False aodh_running_on_con = False + # Disabling SNMP write plug-in snmp_running = False - _print_label('Testing Gnocchi, AODH and SNMP on controller nodes') + _print_label('Testing Gnocchi and AODH plugins on nodes') for controller in controllers: - gnocchi_client = GnocchiClient() - gnocchi_client.auth_token() gnocchi_running = ( - gnocchi_running_on_con and conf.is_gnocchi_running(controller)) - aodh_client = AodhClient() - aodh_client.auth_token() + gnocchi_running_on_con or conf.is_gnocchi_running(controller)) aodh_running = ( - aodh_running_on_con and conf.is_aodh_running(controller)) - if gnocchi_running: - logger.info("Gnocchi is running on controller.") - elif aodh_running: - logger.error("Gnocchi is not running on controller.") - logger.info("AODH is running on controller.") - elif snmp_running: - logger.error("Gnocchi is not running on Controller") - logger.error("AODH is not running on controller.") - logger.info("SNMP is running on controller.") - else: - logger.error("Gnocchi is not running on Controller") - logger.error("AODH is not running on controller.") - logger.error("SNMP is not running on controller.") - logger.info("CSV will be enabled on compute nodes.") + aodh_running_on_con or conf.is_aodh_running(controller)) compute_ids = [] compute_node_names = [] @@ -782,118 +763,98 @@ def main(bt_logger=None): 'mcelog': 'Mcelog', 'ovs_stats': 'OVS stats', 'ovs_events': 'OVS events'} - out_plugins = { - 'gnocchi': 'Gnocchi', - 'aodh': 'AODH', - 'snmp': 'SNMP', - 'csv': 'CSV'} + aodh_plugin_labels = { + 'mcelog': 'Mcelog', + 'ovs_events': 'OVS events'} + out_plugins = {} for compute_node in computes: node_id = compute_node.get_id() node_name = compute_node.get_name() - out_plugins[node_id] = 'CSV' + out_plugins[node_id] = [] compute_ids.append(node_id) compute_node_names.append(node_name) plugins_to_enable = [] - _print_label('NODE {}: Test Gnocchi Plug-in'.format(node_name)) - logger.info('Checking if gnocchi plug-in is included in compute nodes.') - if not conf.check_gnocchi_plugin_included(compute_node): - logger.error('Gnocchi plug-in is not included.') - logger.info( - 'Testcases on node {} will not be executed'.format(node_name)) - else: - collectd_restarted, collectd_warnings = \ - conf.restart_collectd(compute_node) - sleep_time = 30 - logger.info( - 'Sleeping for {} seconds after collectd restart...'.format( - sleep_time)) - time.sleep(sleep_time) - if not collectd_restarted: - for warning in collectd_warnings: - logger.warning(warning) + error_plugins = [] + gnocchi_running = ( + gnocchi_running and conf.check_gnocchi_plugin_included( + compute_node)) + aodh_running = ( + aodh_running and conf.check_aodh_plugin_included(compute_node)) + # logger.info("SNMP enabled on {}" .format(node_name)) + if gnocchi_running: + out_plugins[node_id].append("Gnocchi") + if aodh_running: + out_plugins[node_id].append("AODH") + if snmp_running: + out_plugins[node_id].append("SNMP") + + if 'Gnocchi' in out_plugins[node_id]: + plugins_to_enable.append('csv') + out_plugins[node_id].append("CSV") + if plugins_to_enable: + _print_label( + 'NODE {}: Enabling Test Plug-in '.format(node_name) + + 'and Test case execution') + if plugins_to_enable and not conf.enable_plugins( + compute_node, plugins_to_enable, error_plugins, + create_backup=False): logger.error( - 'Restart of collectd on node {} failed'.format(node_name)) + 'Failed to test plugins on node {}.'.format(node_id)) logger.info( 'Testcases on node {} will not be executed'.format( - node_name)) - else: - for warning in collectd_warnings: - logger.warning(warning) - - if gnocchi_running: - out_plugins[node_id] = 'Gnocchi' - logger.info("Gnocchi is active and collecting data") - elif aodh_running: - out_plugins[node_id] = 'AODH' - logger.info("AODH withh be tested") - _print_label('Node {}: Test AODH' .format(node_name)) - logger.info("Checking if AODH is running") - logger.info("AODH is running") - elif snmp_running: - out_plugins[node_id] = 'SNMP' - logger.info("SNMP will be tested.") - _print_label('NODE {}: Test SNMP'.format(node_id)) - logger.info("Checking if SNMP is running.") - logger.info("SNMP is running.") - else: - plugins_to_enable.append('csv') - out_plugins[node_id] = 'CSV' - logger.error("Gnocchi, AODH, SNMP are not running") - logger.info( - "CSV will be enabled for verification " - + "of test plugins.") - if plugins_to_enable: - _print_label( - 'NODE {}: Enabling Test Plug-in '.format(node_name) - + 'and Test case execution') - error_plugins = [] - if plugins_to_enable and not conf.enable_plugins( - compute_node, plugins_to_enable, error_plugins, - create_backup=False): + node_id)) + + for i in out_plugins[node_id]: + if i == 'AODH': + for plugin_name in sorted(aodh_plugin_labels.keys()): + _exec_testcase( + aodh_plugin_labels, plugin_name, i, + controllers, compute_node, conf, results, + error_plugins, out_plugins[node_id]) + elif i == 'CSV': + _print_label("Node {}: Executing CSV Testcases".format( + node_name)) + logger.info("Restarting collectd for CSV tests") + collectd_restarted, collectd_warnings = \ + conf.restart_collectd(compute_node) + sleep_time = 10 + logger.info( + 'Sleeping for {} seconds'.format(sleep_time) + + ' after collectd restart...') + time.sleep(sleep_time) + if not collectd_restarted: + for warning in collectd_warnings: + logger.warning(warning) logger.error( - 'Failed to test plugins on node {}.'.format(node_id)) + 'Restart of collectd on node {} failed'.format( + compute_node)) logger.info( - 'Testcases on node {} will not be executed'.format( - node_id)) - else: - if plugins_to_enable: - collectd_restarted, collectd_warnings = \ - conf.restart_collectd(compute_node) - sleep_time = 30 - logger.info( - 'Sleeping for {} seconds'.format(sleep_time) - + ' after collectd restart...') - time.sleep(sleep_time) - if plugins_to_enable and not collectd_restarted: - for warning in collectd_warnings: - logger.warning(warning) - logger.error( - 'Restart of collectd on node {} failed'.format( - node_id)) - logger.info( - 'Testcases on node {}'.format(node_id) - + ' will not be executed.') - else: - if collectd_warnings: - for warning in collectd_warnings: - logger.warning(warning) - - for plugin_name in sorted(plugin_labels.keys()): - _exec_testcase( - plugin_labels, plugin_name, gnocchi_running, - aodh_running, snmp_running, controllers, - compute_node, conf, results, error_plugins, - out_plugins[node_id]) - - # _print_label('NODE {}: Restoring config file'.format(node_name)) - # conf.restore_config(compute_node) - mcelog_delete() - print_overall_summary(compute_ids, plugin_labels, results, out_plugins) - - if ((len([res for res in results if not res[2]]) > 0) - or (len(results) < len(computes) * len(plugin_labels))): - logger.error('Some tests have failed or have not been executed') - return 1 + 'CSV Testcases on node {}'.format(compute_node) + + ' will not be executed.') + for plugin_name in sorted(plugin_labels.keys()): + _exec_testcase( + plugin_labels, plugin_name, i, + controllers, compute_node, conf, results, + error_plugins, out_plugins[node_id]) + + else: + for plugin_name in sorted(plugin_labels.keys()): + _exec_testcase( + plugin_labels, plugin_name, i, + controllers, compute_node, conf, results, + error_plugins, out_plugins[node_id]) + + mcelog_delete() + print_overall_summary( + compute_ids, plugin_labels, aodh_plugin_labels, results, out_plugins) + + for res in results: + if res[3] is 'False' or 'None': + logger.error('Some tests have failed or have not been executed') + logger.error('Overall Result is Fail') + return 1 + else: + pass return 0 diff --git a/baro_tests/config_server.py b/baro_tests/config_server.py index f156fcf7..f35f7882 100644 --- a/baro_tests/config_server.py +++ b/baro_tests/config_server.py @@ -19,7 +19,6 @@ import time import os.path import os import re -import subprocess from opnfv.deployment import factory ID_RSA_PATH = '/root/.ssh/id_rsa' SSH_KEYS_SCRIPT = '/home/opnfv/barometer/baro_utils/get_ssh_keys.sh' @@ -28,7 +27,7 @@ COLLECTD_CONF = '/etc/collectd.conf' COLLECTD_CONF_DIR = '/etc/collectd/collectd.conf.d' NOTIFICATION_FILE = '/var/log/python-notifications.dump' COLLECTD_NOTIFICATION = '/etc/collectd_notification_dump.py' -APEX_IP = subprocess.check_output("echo $INSTALLER_IP", shell=True) +APEX_IP = os.getenv("INSTALLER_IP").rstrip('\n') APEX_USER = 'root' APEX_USER_STACK = 'stack' APEX_PKEY = '/root/.ssh/id_rsa' @@ -101,20 +100,20 @@ class ConfigServer(object): stderr_lines = stderr.readlines() if stderr_lines: self.__logger.warning( - "'fuel node' command failed (try {}):".format(attempt)) + "'Apex node' command failed (try {}):".format(attempt)) for line in stderr_lines: self.__logger.debug(line.strip()) else: fuel_node_passed = True if attempt > 1: self.__logger.info( - "'fuel node' command passed (try {})".format(attempt)) + "'Apex node' command passed (try {})".format(attempt)) attempt += 1 if not fuel_node_passed: self.__logger.error( - "'fuel node' command failed. This was the last try.") + "'Apex node' command failed. This was the last try.") raise OSError( - "'fuel node' command failed. This was the last try.") + "'Apex node' command failed. This was the last try.") node_table = stdout.readlines()\ # skip table title and parse table values @@ -184,9 +183,10 @@ class ConfigServer(object): if compute_name == node.get_dict()['name']: stdout = node.run_cmd( 'cat /etc/collectd/collectd.conf.d/{}.conf'.format(plugin)) + if stdout is None: + return default_interval for line in stdout.split('\n'): if 'Interval' in line: - # line = line.strip('Interval') return 1 return default_interval @@ -206,6 +206,8 @@ class ConfigServer(object): if compute_name == node.get_dict()['name']: stdout = node.run_cmd( 'cat /etc/collectd/collectd.conf.d/{}.conf' .format(plugin)) + if stdout is None: + return default_values for line in stdout.split('\n'): if 'Interfaces' in line: return line.split(' ', 1)[1] @@ -257,28 +259,49 @@ class ConfigServer(object): Return boolean value whether Gnocchi is running. """ gnocchi_present = False - lines = self.execute_command( - 'source overcloudrc.v3;systemctl status openstack-gnocchi-api | ' - + 'grep running', controller.get_ip()) - for line in lines: - if '(running)' in line: - gnocchi_present = True + controller_name = controller.get_name() + nodes = get_apex_nodes() + for node in nodes: + if controller_name == node.get_dict()['name']: + node.put_file( + '/home/opnfv/functest/conf/openstack.creds', + 'overcloudrc.v3') + stdout = node.run_cmd( + "source overcloudrc.v3;" + + "openstack catalog list | grep gnocchi") + if stdout is None: + return False + elif 'gnocchi' in stdout: + gnocchi_present = True + return gnocchi_present + else: + return False return gnocchi_present def is_aodh_running(self, controller): """Check whether aodh service is running on controller """ aodh_present = False - lines = self.execute_command( - 'source overcloudrc.v3;systemctl openstack-aodh-api | grep running', - controller.get_ip()) - for line in lines: - self.__logger.info("Line = {}" .format(line)) - if '(running)' in line: - aodh_present = True + controller_name = controller.get_name() + nodes = get_apex_nodes() + for node in nodes: + if controller_name == node.get_dict()['name']: + node.put_file( + '/home/opnfv/functest/conf/openstack.creds', + 'overcloudrc.v3') + stdout = node.run_cmd( + "source overcloudrc.v3;" + + "openstack catalog list | grep aodh") + if stdout is None: + return False + elif 'aodh' in stdout: + aodh_present = True + return aodh_present + else: + return False return aodh_present - def is_installed(self, compute, package): + def is_mcelog_installed(self, compute, package): """Check whether package exists on compute node. Keyword arguments: @@ -292,8 +315,10 @@ class ConfigServer(object): for node in nodes: if compute_name == node.get_dict()['name']: stdout = node.run_cmd( - 'yum list installed | grep mcelog') - if 'mcelog' in stdout: + 'rpm -qa | grep mcelog') + if stdout is None: + return 0 + elif 'mcelog' in stdout: return 1 else: return 0 @@ -310,6 +335,32 @@ class ConfigServer(object): return True return False + def check_aodh_plugin_included(self, compute): + """Check if aodh plugin is included in collectd.conf file. + If not, try to enable it. + + Keyword arguments: + compute -- compute node instance + + Return boolean value whether AODH plugin is included + or it's enabling was successful. + """ + compute_name = compute.get_name() + nodes = get_apex_nodes() + for node in nodes: + if compute_name == node.get_dict()['name']: + aodh_conf = node.run_cmd('ls /etc/collectd/collectd.conf.d') + if 'aodh.conf' not in aodh_conf: + self.__logger.info( + "AODH Plugin not included in {}".format(compute_name)) + return False + else: + self.__logger.info( + "AODH plugin present in compute node {}" .format( + compute_name)) + return True + return True + def check_gnocchi_plugin_included(self, compute): """Check if gnocchi plugin is included in collectd.conf file. If not, try to enable it. @@ -324,16 +375,37 @@ class ConfigServer(object): nodes = get_apex_nodes() for node in nodes: if compute_name == node.get_dict()['name']: - # node.run_cmd('su; "opnfvapex"') gnocchi_conf = node.run_cmd('ls /etc/collectd/collectd.conf.d') if 'collectd-ceilometer-plugin.conf' not in gnocchi_conf: - self.__logger.info("Gnocchi Plugin not included") - return True + self.__logger.info( + "Gnocchi Plugin not included in node {}".format( + compute_name)) + return False else: - self.__logger.info("Gnochi plugin present") + self.__logger.info( + "Gnocchi plugin available in compute node {}" .format( + compute_name)) return True return True + def check_snmp_plugin_included(self, compute): + """Check if SNMP plugin is active in compute node. + """ + snmp_mib = '/usr/share/snmp/mibs/Intel-Rdt.txt' + snmp_string = 'INTEL-RDT-MIB::intelRdt' + compute_name = compute.get_name() + nodes = get_apex_nodes() + for node in nodes: + if compute_name == node.get_dict()['name']: + stdout = node.run_cmd( + 'snmpwalk -v2c -m {0} -c public localhost {1}' .format( + snmp_mib, snmp_string)) + self.__logger.info("snmp output = {}" .format(stdout)) + if 'OID' in stdout: + return False + else: + return True + def enable_plugins( self, compute, plugins, error_plugins, create_backup=True): """Enable plugins on compute node @@ -341,43 +413,21 @@ class ConfigServer(object): Keyword arguments: compute -- compute node instance plugins -- list of plugins to be enabled - error_plugins -- list of tuples with found errors, new entries - may be added there (plugin, error_description, is_critical): - plugin -- plug-in name - error_decription -- description of the error - is_critical -- boolean value indicating whether error - is critical - create_backup -- boolean value indicating whether backup - shall be created Return boolean value indicating whether function was successful. """ + csv_file = os.path.dirname(os.path.realpath(__file__)) + '/csv.conf' plugins = sorted(plugins) compute_name = compute.get_name() nodes = get_apex_nodes() for node in nodes: if compute_name == node.get_dict()['name']: - node.put_file( - '/usr/local/lib/python2.7/dist-packages/baro_tests/' - + 'csv.conf', 'csv.conf') + node.put_file(csv_file, 'csv.conf') node.run_cmd( 'sudo cp csv.conf ' + '/etc/collectd/collectd.conf.d/csv.conf') return True - def restore_config(self, compute): - """Restore collectd config file from backup on compute node. - - Keyword arguments: - compute -- compute node instance - """ - ssh, sftp = self.__open_sftp_session( - compute.get_ip(), 'root', 'opnfvapex') - - self.__logger.info('Restoring config file from backup...') - ssh.exec_command("cp {0} {0}.used".format(COLLECTD_CONF)) - ssh.exec_command("cp {0}.backup {0}".format(COLLECTD_CONF)) - def restart_collectd(self, compute): """Restart collectd on compute node. @@ -419,142 +469,175 @@ class ConfigServer(object): return False, warning return True, warning - def test_gnocchi_is_sending_data(self, controller): - """ Checking if Gnocchi is sending metrics to controller""" - metric_ids = [] - timestamps1 = {} - timestamps2 = {} - ssh, sftp = self.__open_sftp_session( - controller.get_ip(), 'root', 'opnfvapex') - - self.__logger.info('Getting gnocchi metric list on{}'.format( - controller.get_name())) - stdout = self.execute_command( - "source overcloudrc.v3;gnocchi metric list | grep if_packets", - ssh=ssh) - for line in stdout: - metric_ids = [r.split('|')[1] for r in stdout] - self.__logger.info("Metric ids = {}" .format(metric_ids)) - for metric_id in metric_ids: - metric_id = metric_id.replace("u", "") - stdout = self.execute_command( - "source overcloudrc.v3;gnocchi measures show {}" .format( - metric_id), ssh=ssh) - self.__logger.info("stdout measures ={}" .format(stdout)) - for line in stdout: - if line[0] == '+': - pass - else: - self.__logger.info("Line = {}" .format(line)) - timestamps1 = [line.split('|')[1]] - self.__logger.info("Last line timetamp1 = {}" .format(timestamps1)) - time.sleep(10) - stdout = self.execute_command( - "source overcloudrc.v3;gnocchi measures show {}" .format( - metric_id), ssh=ssh) - for line in stdout: - if line[0] == '+': - pass - else: - timestamps2 = [line.split('|')[1]] - self.__logger.info("Last line timetamp2 = {}" .format(timestamps2)) - if timestamps1 == timestamps2: - self.__logger.info("False") - # return False - return True - else: - self.__logger.info("True") - return True + def test_plugins_with_aodh( + self, compute, plugin_interval, logger, + criteria_list=[]): - def test_plugins_with_aodh(self, controller): - """Checking if AODH is sending metrics to controller""" - metric_ids = [] + metric_id = {} timestamps1 = {} timestamps2 = {} - ssh, sftp = self.__open_sftp_session( - controller.get_ip(), 'root', 'opnfvapex') - self.__logger.info('Getting AODH alarm list on{}'.format( - controller.get_name())) - stdout = self.execute_command( - "source overcloudrc.v3;aodh alarm list | grep mcelog", - ssh=ssh) - for line in stdout: - metric_ids = [r.split('|')[1] for r in stdout] - self.__logger.info("Metric ids = {}" .format(metric_ids)) - for metric_id in metric_ids: - metric_id = metric_id.replace("u", "") - stdout = self.execute_command( - "source overcloudrc.v3;aodh alarm show {}" .format( - metric_id), ssh=ssh) - self.__logger.info("stdout alarms ={}" .format(stdout)) - for line in stdout: - if line[0] == '+': - pass - else: - self.__logger.info("Line = {}" .format(line)) - timestamps1 = [line.split('|')[1]] - self.__logger.info("Last line timetamp1 = {}" .format(timestamps1)) - time.sleep(10) - stdout = self.execute_command( - "source overcloudrc.v3;aodh alarm show {}" .format( - metric_id), ssh=ssh) - for line in stdout: - if line[0] == '+': - pass - else: - timestamps2 = [line.split('|')[1]] - self.__logger.info("Last line timetamp2 = {}" .format(timestamps2)) - if timestamps1 == timestamps2: - self.__logger.info("False") - # return False - return True - else: - self.__logger.info("True") - return True + nodes = get_apex_nodes() + for node in nodes: + if node.is_controller(): + self.__logger.info('Getting AODH Alarm list on {}' .format( + (node.get_dict()['name']))) + node.put_file( + '/home/opnfv/functest/conf/openstack.creds', + 'overcloudrc.v3') + stdout = node.run_cmd( + "source overcloudrc.v3;" + + "aodh alarm list | grep {0} | grep {1}" + .format(criteria_list, compute)) + if stdout is None: + self.__logger.info("aodh alarm list was empty") + return False + for line in stdout.splitlines(): + line = line.replace('|', "") + metric_id = line.split()[0] + stdout = node.run_cmd( + 'source overcloudrc.v3; aodh alarm show {}' .format( + metric_id)) + if stdout is None: + self.__logger.info("aodh alarm list was empty") + return False + for line in stdout.splitlines()[3: -1]: + line = line.replace('|', "") + if line.split()[0] == 'timestamp': + timestamps1 = line.split()[1] + else: + pass + time.sleep(12) + stdout = node.run_cmd( + "source overcloudrc.v3; aodh alarm show {}" .format( + metric_id)) + if stdout is None: + self.__logger.info("aodh alarm list was empty") + return False + for line in stdout.splitlines()[3:-1]: + line = line.replace('|', "") + if line.split()[0] == 'timestamp': + timestamps2 = line.split()[1] + else: + pass + if timestamps1 == timestamps2: + self.__logger.info( + "Data not updated after interval of 12 seconds") + return False + else: + self.__logger.info("PASS") + return True def test_plugins_with_gnocchi( - self, controller, compute_node, plugin_interval, logger, + self, compute, plugin_interval, logger, criteria_list=[]): - metric_ids = [] + metric_id = {} timestamps1 = {} timestamps2 = {} - ssh, sftp = self.__open_sftp_session( - controller.get_ip(), 'root', 'opnfvapex') - self.__logger.info('Getting gnocchi metric list on{}'.format( - controller.get_name())) - stdout = self.execute_command( - "source overcloudrc.v3;gnocchi metric list | grep {0} | grep {1}" - .format(compute_node.get_name(), criteria_list), ssh=ssh) - for line in stdout: - metric_ids = [r.split('|')[1] for r in stdout] - self.__logger.info("Metric ids = {}" .format(metric_ids)) - for metric_id in metric_ids: - metric_id = metric_id.replace("u", "") - stdout = self.execute_command( - "source overcloudrc.v3;gnocchi measures show {}" .format( - metric_id), ssh=ssh) - self.__logger.info("stdout measures ={}" .format(stdout)) - for line in stdout: - if line[0] == '+': - pass - else: - self.__logger.info("Line = {}" .format(line)) - timestamps1 = [line.split('|')[1]] - self.__logger.info("Last line timetamp1 = {}" .format(timestamps1)) - time.sleep(10) - stdout = self.execute_command( - "source overcloudrc.v3;gnocchi measures show {}" .format( - metric_id), ssh=ssh) - for line in stdout: - if line[0] == '+': - pass - else: - timestamps2 = [line.split('|')[1]] - self.__logger.info("Last line timetamp2 = {}" .format(timestamps2)) - if timestamps1 == timestamps2: - self.__logger.info("False") - return False - else: - self.__logger.info("True") - return True + nodes = get_apex_nodes() + sleep_time = plugin_interval + 2 + for node in nodes: + if node.is_controller(): + self.__logger.info('Getting gnocchi metric list on {}' .format( + (node.get_dict()['name']))) + node.put_file( + '/home/opnfv/functest/conf/openstack.creds', + 'overcloudrc.v3') + stdout = node.run_cmd( + "source overcloudrc.v3;" + + "gnocchi metric list | grep {0} | grep {1}" + .format(criteria_list, compute)) + if stdout is None: + self.__logger.info("gnocchi list was empty") + return False + for line in stdout.splitlines(): + line = line.replace('|', "") + metric_id = line.split()[0] + stdout = node.run_cmd( + 'source overcloudrc.v3;gnocchi measures show {}'.format( + metric_id)) + if stdout is None: + self.__logger.info("gnocchi list was empty") + return False + for line in stdout.splitlines()[3: -1]: + if line[0] == '+': + pass + else: + timestamps1 = line.replace('|', "") + timestamps1 = timestamps1.split()[0] + time.sleep(sleep_time) + stdout = node.run_cmd( + "source overcloudrc.v3;gnocchi measures show {}".format( + metric_id)) + if stdout is None: + self.__logger.info("gnocchi measures was empty") + return False + for line in stdout.splitlines()[3:-1]: + if line[0] == '+': + pass + else: + timestamps2 = line.replace('|', "") + timestamps2 = timestamps2.split()[0] + if timestamps1 == timestamps2: + self.__logger.info( + "Plugin Interval is {}" .format(plugin_interval)) + self.__logger.info( + "Data not updated after {} seconds".format( + sleep_time)) + return False + else: + self.__logger.info("PASS") + return True + return False + + def test_plugins_with_snmp( + self, compute, plugin_interval, logger, plugin, snmp_mib_files=[], + snmp_mib_strings=[], snmp_in_commands=[]): + + if plugin == 'hugepages' or 'intel_rdt' or 'mcelog': + nodes = get_apex_nodes() + for node in nodes: + if compute == node.get_dict()['name']: + stdout = node.run_cmd( + 'snmpwalk -v2c -m {0} -c public localhost {1}' .format( + snmp_mib_files, snmp_mib_strings)) + self.__logger.info("{}" .format(stdout)) + if stdout is None: + self.__logger.info("No output from snmpwalk") + return False + elif 'OID' in stdout: + self.__logger.info("SNMP query failed") + return False + else: + counter1 = stdout.split()[3] + time.sleep(10) + stdout = node.run_cmd( + 'snmpwalk -v2c -m {0} -c public localhost {1}' .format( + snmp_mib_files, snmp_mib_strings)) + self.__logger.info("{}" .format(stdout)) + if stdout is None: + self.__logger.info("No output from snmpwalk") + elif 'OID' in stdout: + self.__logger.info( + "SNMP query failed during second check") + self.__logger.info("waiting for 10 sec") + time.sleep(10) + stdout = node.run_cmd( + 'snmpwalk -v2c -m {0} -c public localhost {1}' .format( + snmp_mib_files, snmp_mib_strings)) + self.__logger.info("{}" .format(stdout)) + if stdout is None: + self.__logger.info("No output from snmpwalk") + elif 'OID' in stdout: + self.__logger.info("SNMP query failed again") + self.__logger.info("Failing this test case") + return False + else: + counter2 = stdout.split()[3] + + if counter1 == counter2: + return False + else: + return True + else: + return False diff --git a/docs/release/configguide/index.rst b/docs/release/configguide/index.rst index 6305e357..57ff2787 100644 --- a/docs/release/configguide/index.rst +++ b/docs/release/configguide/index.rst @@ -1,3 +1,5 @@ +.. _barometer-configguide: + .. This work is licensed under a Creative Commons Attribution 4.0 International License. .. http://creativecommons.org/licenses/by/4.0 .. Copyright (c) 2017 Open Platform for NFV Project, Inc. and its contributors diff --git a/docs/release/release-notes/index.rst b/docs/release/release-notes/index.rst index 44713cd3..db8221ab 100644 --- a/docs/release/release-notes/index.rst +++ b/docs/release/release-notes/index.rst @@ -1,3 +1,5 @@ +.. _barometer-releasenotes: + .. This work is licensed under a Creative Commons Attribution 4.0 International License. .. http://creativecommons.org/licenses/by/4.0 .. (c) OPNFV, Intel Corporation and others. diff --git a/docs/release/scenarios/os-nosdn-bar-ha/index.rst b/docs/release/scenarios/os-nosdn-bar-ha/index.rst new file mode 100644 index 00000000..d1b18639 --- /dev/null +++ b/docs/release/scenarios/os-nosdn-bar-ha/index.rst @@ -0,0 +1,15 @@ +.. _os-nosdn-bar-ha: + +.. This work is licensed under a Creative Commons Attribution 4.0 International Licence. +.. http://creativecommons.org/licenses/by/4.0 +.. (c) <optionally add copywriters name> + +======================================== +os-nosdn-bar-ha overview and description +======================================== + +.. toctree:: + :numbered: + :maxdepth: 4 + + scenario.description.rst diff --git a/docs/release/scenarios/os-nosdn-bar-noha/index.rst b/docs/release/scenarios/os-nosdn-bar-noha/index.rst new file mode 100644 index 00000000..92851afa --- /dev/null +++ b/docs/release/scenarios/os-nosdn-bar-noha/index.rst @@ -0,0 +1,15 @@ +.. _os-nosdn-bar-noha: + +.. This work is licensed under a Creative Commons Attribution 4.0 International Licence. +.. http://creativecommons.org/licenses/by/4.0 +.. (c) <optionally add copywriters name> + +========================================== +os-nosdn-bar-noha overview and description +========================================== + +.. toctree:: + :numbered: + :maxdepth: 4 + + scenario.description.rst diff --git a/docs/release/userguide/feature.userguide.rst b/docs/release/userguide/feature.userguide.rst index 099d8e27..d61af7cd 100644 --- a/docs/release/userguide/feature.userguide.rst +++ b/docs/release/userguide/feature.userguide.rst @@ -321,7 +321,7 @@ To configure some hugepages: $ sudo mkdir -p /mnt/huge $ sudo mount -t hugetlbfs nodev /mnt/huge - $ sudo echo 14336 > /sys/devices/system/node/node0/hugepages/hugepages-2048kB/nr_hugepages + $ sudo bash -c "echo 14336 > /sys/devices/system/node/node0/hugepages/hugepages-2048kB/nr_hugepages" Building and installing collectd: @@ -514,41 +514,24 @@ Remove old version of OpenIPMI library: $ sudo yum remove OpenIPMI ipmitool -Download OpenIPMI library sources: +Build and install OpenIPMI library: .. code:: bash $ git clone https://git.code.sf.net/p/openipmi/code openipmi-code $ cd openipmi-code - -Patch the OpenIPMI pkg-config file to provide correct compilation flags -for collectd IPMI plugin: - -.. code:: diff - - diff --git a/OpenIPMIpthread.pc.in b/OpenIPMIpthread.pc.in - index 59b52e5..fffa0d0 100644 - --- a/OpenIPMIpthread.pc.in - +++ b/OpenIPMIpthread.pc.in - @@ -6,6 +6,6 @@ includedir=@includedir@ - Name: OpenIPMIpthread - Description: Pthread OS handler for OpenIPMI - Version: @VERSION@ - -Requires: OpenIPMI pthread - +Requires: OpenIPMI - Libs: -L${libdir} -lOpenIPMIutils -lOpenIPMIpthread - -Cflags: -I${includedir} - +Cflags: -I${includedir} -pthread - -Build and install OpenIPMI library: - -.. code:: bash - $ autoreconf --install $ ./configure --prefix=/usr $ make $ sudo make install +Add the directory containing ``OpenIPMI*.pc`` files to the ``PKG_CONFIG_PATH`` +environment variable: + +.. code:: bash + + export PKG_CONFIG_PATH=/usr/lib/pkgconfig + Enable IPMI support in the kernel: .. code:: bash @@ -924,9 +907,11 @@ On Centos 7: .. code:: bash $ sudo yum install net-snmp net-snmp-libs net-snmp-utils net-snmp-devel - $ systemctl start snmpd.service + $ sudo systemctl start snmpd.service -Or build from source +go to the `snmp configuration`_ steps. + +From source: Clone and build net-snmp: @@ -967,12 +952,14 @@ Configure snmpd as a service: $ systemctl enable snmpd.service $ systemctl start snmpd.service +.. _`snmp configuration`: + Add the following line to snmpd.conf configuration file -"/usr/share/snmp/snmpd.conf" to make all OID tree visible for SNMP clients: +``/etc/snmp/snmpd.conf`` to make all OID tree visible for SNMP clients: .. code:: bash - view systemonly included .1 + view systemview included .1 To verify that SNMP is working you can get IF-MIB table using SNMP client to view the list of Linux interfaces: @@ -981,11 +968,28 @@ to view the list of Linux interfaces: $ snmpwalk -v 2c -c public localhost IF-MIB::interfaces +Get the default MIB location: + +.. code:: bash + + $ net-snmp-config --default-mibdirs + /opt/stack/.snmp/mibs:/usr/share/snmp/mibs + +Install Intel specific MIBs (if needed) into location received by +``net-snmp-config`` command (e.g. ``/usr/share/snmp/mibs``). + +.. code:: bash + + $ git clone https://gerrit.opnfv.org/gerrit/barometer.git + $ sudo cp -f barometer/mibs/*.txt /usr/share/snmp/mibs/ + $ sudo systemctl restart snmpd.service + Clone and install the collectd snmp_agent plugin: .. code:: bash - $ git clone https://github.com/maryamtahhan/collectd + $ cd ~ + $ git clone https://github.com/maryamtahhan/collectd $ cd collectd $ git checkout feat_snmp $ ./build.sh @@ -1013,6 +1017,15 @@ memAvailReal OID to value represented as free memory type of memory plugin: </Data> </Plugin> + +The ``snmpwalk`` command can be used to validate the collectd configuration: + +.. code:: bash + + $ snmpwalk -v 2c -c public localhost 1.3.6.1.4.1.2021.4.6.0 + UCD-SNMP-MIB::memAvailReal.0 = INTEGER: 135237632 kB + + **Limitations** * Object instance with Counter64 type is not supported in SNMPv1. When GetNext diff --git a/docs/release/userguide/index.rst b/docs/release/userguide/index.rst index 287b16be..a6ec261f 100644 --- a/docs/release/userguide/index.rst +++ b/docs/release/userguide/index.rst @@ -1,3 +1,5 @@ +.. _barometer-userguide: + .. This work is licensed under a Creative Commons Attribution 4.0 International License. .. http://creativecommons.org/licenses/by/4.0 .. (c) Intel and OPNFV diff --git a/src/Makefile b/src/Makefile index 382acefc..e7a472c1 100644 --- a/src/Makefile +++ b/src/Makefile @@ -38,6 +38,6 @@ SUBDIRS += dpdk SUBDIRS += libpqos SUBDIRS += pmu-tools SUBDIRS += collectd -SUBDIRS += collectd-ceilometer-plugin +SUBDIRS += collectd-openstack-plugins include mk/make-subsys.mk diff --git a/src/collectd-ceilometer-plugin/Makefile b/src/collectd-openstack-plugins/Makefile index a8b8f938..96bbebb8 100644 --- a/src/collectd-ceilometer-plugin/Makefile +++ b/src/collectd-openstack-plugins/Makefile @@ -25,7 +25,7 @@ include ../package-list.mk .PHONY: force_make install -WORK_DIR = collectd-ceilometer-plugin +WORK_DIR = collectd-openstack-plugins TAG_DONE_FLAG = $(WORK_DIR)/.$(COLLECTD_OPENSTACK_TAG).tag.done all: force_make install @@ -59,5 +59,5 @@ sanity: @echo "Make sanity in $(WORK_DIR) (stub) " $(WORK_DIR): - $(AT)git clone $(COLLECTD_OPENSTACK_URL) + $(AT)git clone $(COLLECTD_OPENSTACK_URL) $(WORK_DIR) diff --git a/src/package-list.mk b/src/package-list.mk index 762b4230..e857dcf8 100644 --- a/src/package-list.mk +++ b/src/package-list.mk @@ -19,4 +19,4 @@ COLLECTD_URL ?= https://github.com/collectd/collectd COLLECTD_TAG ?= master COLLECTD_OPENSTACK_URL ?= https://github.com/openstack/collectd-ceilometer-plugin -COLLECTD_OPENSTACK_TAG ?= stable/ocata +COLLECTD_OPENSTACK_TAG ?= stable/pike |