aboutsummaryrefslogtreecommitdiffstats
path: root/app/statistics/stats_consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'app/statistics/stats_consumer.py')
-rwxr-xr-xapp/statistics/stats_consumer.py134
1 files changed, 134 insertions, 0 deletions
diff --git a/app/statistics/stats_consumer.py b/app/statistics/stats_consumer.py
new file mode 100755
index 0000000..e0a7d46
--- /dev/null
+++ b/app/statistics/stats_consumer.py
@@ -0,0 +1,134 @@
+#!/usr/bin/env python3
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+
+import argparse
+import json
+import time
+
+from kafka import KafkaConsumer
+
+from discover.configuration import Configuration
+from utils.inventory_mgr import InventoryMgr
+from utils.logging.full_logger import FullLogger
+from utils.mongo_access import MongoAccess
+
+
+class StatsConsumer(MongoAccess):
+ default_env = "WebEX-Mirantis@Cisco"
+
+ def __init__(self):
+ self.get_args()
+ MongoAccess.set_config_file(self.args.mongo_config)
+ MongoAccess.__init__(self)
+ self.log = FullLogger()
+ self.log.set_loglevel(self.args.loglevel)
+ self.conf = Configuration()
+ self.inv = InventoryMgr()
+ self.inv.set_collections(self.args.inventory)
+ stats_coll = self.inv.get_coll_name('statistics')
+ self.stats = self.db[stats_coll]
+ # consume messages from topic
+ self.consumer = KafkaConsumer('VPP.stats',
+ group_id='calipso_test',
+ auto_offset_reset=self.args.offset,
+ bootstrap_servers=['localhost:9092'])
+
+ def get_args(self):
+ # try to read scan plan from command line parameters
+ parser = argparse.ArgumentParser()
+ parser.add_argument("-m", "--mongo_config", nargs="?", type=str,
+ default="",
+ help="name of config file " +
+ "with MongoDB servr access details")
+ parser.add_argument("-e", "--env", nargs="?", type=str,
+ default=self.default_env,
+ help="name of environment to scan \n" +
+ "(default: " + self.default_env + ")")
+ parser.add_argument("-y", "--inventory", nargs="?", type=str,
+ default="inventory",
+ help="name of inventory collection \n" +
+ "(default: 'inventory')")
+ parser.add_argument("-l", "--loglevel", nargs="?", type=str,
+ default="INFO",
+ help="logging level \n(default: 'INFO')")
+ parser.add_argument("-o", "--offset", nargs="?", type=str,
+ default="largest",
+ help="where to start reading" +
+ " - use 'smallest' for start \n" +
+ "(default: 'largest')")
+ self.args = parser.parse_args()
+
+ def read(self):
+ for kafka_msg in self.consumer:
+ msg = json.loads(kafka_msg.value.decode())
+ self.add_stats(msg)
+
+ def add_stats(self, msg):
+ host_ip = msg['hostIp']
+ search = {
+ 'environment': self.args.env,
+ 'type': 'host',
+ 'ip_address': host_ip
+ }
+ host = self.inv.find_items(search, get_single=True)
+ if not host:
+ self.log.error('could not find host with ip address=' + host_ip)
+ return
+ host_id = host['id']
+ search = {
+ 'environment': self.args.env,
+ 'type': 'vedge',
+ 'host': host_id
+ }
+ vedge = self.inv.find_items(search, get_single=True)
+ if not vedge:
+ self.log.error('could not find vEdge for host: ' + host_id)
+ return
+ self.log.info('setting VPP stats for vEdge of host: ' + host_id)
+ self.add_stats_for_object(vedge, msg)
+
+ def add_stats_for_object(self, o, msg):
+ msg['type'] = 'vedge_flows'
+ msg['environment'] = self.args.env
+ msg['object_type'] = o['type']
+ msg['object_id'] = o['id']
+ time_seconds = int(msg['averageArrivalNanoSeconds'] / 1000000000)
+ sample_time = time.gmtime(time_seconds)
+ msg['sample_time'] = time.strftime("%Y-%m-%dT%H:%M:%SZ", sample_time)
+ # find instances between which the flow happens
+ # to find the instance, find the related vNIC first
+ msg['source'] = self.find_instance_for_stat('source', msg)
+ msg['destination'] = self.find_instance_for_stat('destination', msg)
+ self.stats.insert_one(msg)
+
+ def find_instance_for_stat(self, direction, msg):
+ search_by_mac_address = 'sourceMacAddress' in msg
+ value_attr = 'MacAddress' if search_by_mac_address else 'IpAddress'
+ value_to_search = msg[direction + value_attr]
+ attr = 'mac_address' if search_by_mac_address else 'ip_address'
+ search = {
+ 'environment': self.args.env,
+ 'type': 'vnic',
+ attr: value_to_search
+ }
+ vnic = self.inv.find_items(search, get_single=True)
+ if not vnic:
+ self.log.error('failed to find vNIC for ' +
+ attr + '=' + value_to_search)
+ return 'Unknown'
+ # now find the instance name from the vnic name
+ name_path = vnic['name_path'].split('/')
+ instance_name = name_path[8]
+ return instance_name
+
+if __name__ == '__main__':
+ stats_consumer = StatsConsumer()
+ stats_consumer.read()