#!/usr/bin/env python # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import sys import base64 import logging import argparse try: import configparser except ImportError: import ConfigParser as configparser from distutils.util import strtobool from kafka import KafkaConsumer from normalizer import Normalizer from normalizer import CollectdValue try: # For Python 3.0 and later import urllib.request as url except ImportError: # Fall back to Python 2's urllib2 import urllib2 as url class VESApp(Normalizer): """VES Application""" def __init__(self): """Application initialization""" self._app_config = { 'Domain': '127.0.0.1', 'Port': 30000, 'Path': '', 'Username': '', 'Password': '', 'Topic': '', 'UseHttps': False, 'SendEventInterval': 20.0, 'ApiVersion': 5.1, 'KafkaPort': 9092, 'KafkaBroker': 'localhost' } def send_data(self, event): """Send event to VES""" server_url = "http{}://{}:{}{}/eventListener/v{}{}".format( 's' if self._app_config['UseHttps'] else '', self._app_config['Domain'], int(self._app_config['Port']), '{}'.format('/{}'.format(self._app_config['Path']) if len( self._app_config['Path']) > 0 else ''), int(self._app_config['ApiVersion']), '{}'.format( '/{}'.format(self._app_config['Topic']) if len( self._app_config['Topic']) > 0 else '')) logging.info('Vendor Event Listener is at: {}'.format(server_url)) credentials = base64.b64encode('{}:{}'.format( self._app_config['Username'], self._app_config['Password']).encode()).decode() logging.info('Authentication credentials are: {}'.format(credentials)) try: request = url.Request(server_url) request.add_header('Authorization', 'Basic {}'.format(credentials)) request.add_header('Content-Type', 'application/json') event_str = json.dumps(event).encode() logging.debug("Sending {} to {}".format(event_str, server_url)) url.urlopen(request, event_str, timeout=1) logging.debug("Sent data to {} successfully".format(server_url)) except url.HTTPError as e: logging.error('Vendor Event Listener exception: {}'.format(e)) except url.URLError as e: logging.error( 'Vendor Event Listener is is not reachable: {}'.format(e)) except Exception as e: logging.error('Vendor Event Listener error: {}'.format(e)) def config(self, config): """VES option configuration""" for key, value in config.items('config'): if key in self._app_config: try: if type(self._app_config[key]) == int: value = int(value) elif type(self._app_config[key]) == float: value = float(value) elif type(self._app_config[key]) == bool: value = bool(strtobool(value)) if isinstance(value, type(self._app_config[key])): self._app_config[key] = value else: logging.error("Type mismatch with %s" % key) sys.exit() except ValueError: logging.error("Incorrect value type for %s" % key) sys.exit() else: logging.error("Incorrect key configuration %s" % key) sys.exit() def init(self, configfile, schema_file): if configfile is not None: # read VES configuration file if provided config = configparser.ConfigParser() config.optionxform = lambda option: option config.read(configfile) self.config(config) # initialize normalizer self.initialize(schema_file, self._app_config['SendEventInterval']) def run(self): """Consumer JSON data from kafka broker""" kafka_server = '{}:{}'.format( self._app_config.get('KafkaBroker'), self._app_config.get('KafkaPort')) consumer = KafkaConsumer( 'collectd', bootstrap_servers=kafka_server, auto_offset_reset='latest', enable_auto_commit=False, value_deserializer=lambda m: json.loads(m.decode('ascii'))) for message in consumer: for kafka_data in message.value: # { # u'dstypes': [u'derive'], # u'plugin': u'cpu', # u'dsnames': [u'value'], # u'interval': 10.0, # u'host': u'localhost', # u'values': [99.9978996416267], # u'time': 1502114956.244, # u'plugin_instance': u'44', # u'type_instance': u'idle', # u'type': u'cpu' # } logging.debug('{}:run():data={}'.format( self.__class__.__name__, kafka_data)) for ds_name in kafka_data['dsnames']: index = kafka_data['dsnames'].index(ds_name) val_hash = CollectdValue.hash_gen( kafka_data['host'], kafka_data['plugin'], kafka_data['plugin_instance'], kafka_data['type'], kafka_data['type_instance'], ds_name) collector = self.get_collector() val = collector.get(val_hash) if val: # update the value val.value = kafka_data['values'][index] val.time = kafka_data['time'] del(val) else: # add new value into the collector val = CollectdValue() val.host = kafka_data['host'] val.plugin = kafka_data['plugin'] val.plugin_instance = kafka_data['plugin_instance'] val.type = kafka_data['type'] val.type_instance = kafka_data['type_instance'] val.value = kafka_data['values'][index] val.interval = kafka_data['interval'] val.time = kafka_data['time'] val.ds_name = ds_name collector.add(val) def main(): # Parsing cmdline options parser = argparse.ArgumentParser() parser.add_argument("--events-schema", dest="schema", required=True, help="YAML events schema definition", metavar="FILE") parser.add_argument("--config", dest="configfile", default=None, help="Specify config file", metavar="FILE") parser.add_argument("--loglevel", dest="level", default='INFO', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], help="Specify log level (default: %(default)s)", metavar="LEVEL") parser.add_argument("--logfile", dest="logfile", default='ves_app.log', help="Specify log file (default: %(default)s)", metavar="FILE") args = parser.parse_args() # Create log file logging.basicConfig(filename=args.logfile, format='%(asctime)s %(message)s', level=args.level) if args.configfile is None: logging.warning("No configfile specified, using default options") # Create Application Instance application_instance = VESApp() application_instance.init(args.configfile, args.schema) try: # Run the plugin application_instance.run() except KeyboardInterrupt: logging.info(" - Ctrl-C handled, exiting gracefully") except Exception as e: logging.error('{}, {}'.format(type(e), e)) finally: application_instance.destroy() sys.exit() if __name__ == '__main__': main()