aboutsummaryrefslogtreecommitdiffstats
path: root/app/statistics/stats_consumer.py
blob: e0a7d4635ec8fccd2b19c25c12dad69eb06e7bea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3
###############################################################################
# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems)   #
# and others                                                                  #
#                                                                             #
# All rights reserved. This program and the accompanying materials            #
# are made available under the terms of the Apache License, Version 2.0       #
# which accompanies this distribution, and is available at                    #
# http://www.apache.org/licenses/LICENSE-2.0                                  #
###############################################################################

import argparse
import json
import time

from kafka import KafkaConsumer

from discover.configuration import Configuration
from utils.inventory_mgr import InventoryMgr
from utils.logging.full_logger import FullLogger
from utils.mongo_access import MongoAccess


class StatsConsumer(MongoAccess):
    default_env = "WebEX-Mirantis@Cisco"

    def __init__(self):
        self.get_args()
        MongoAccess.set_config_file(self.args.mongo_config)
        MongoAccess.__init__(self)
        self.log = FullLogger()
        self.log.set_loglevel(self.args.loglevel)
        self.conf = Configuration()
        self.inv = InventoryMgr()
        self.inv.set_collections(self.args.inventory)
        stats_coll = self.inv.get_coll_name('statistics')
        self.stats = self.db[stats_coll]
        # consume messages from topic
        self.consumer = KafkaConsumer('VPP.stats',
                                      group_id='calipso_test',
                                      auto_offset_reset=self.args.offset,
                                      bootstrap_servers=['localhost:9092'])

    def get_args(self):
        # try to read scan plan from command line parameters
        parser = argparse.ArgumentParser()
        parser.add_argument("-m", "--mongo_config", nargs="?", type=str,
                            default="",
                            help="name of config file " +
                            "with MongoDB servr access details")
        parser.add_argument("-e", "--env", nargs="?", type=str,
                            default=self.default_env,
                            help="name of environment to scan \n" +
                            "(default: " + self.default_env + ")")
        parser.add_argument("-y", "--inventory", nargs="?", type=str,
                            default="inventory",
                            help="name of inventory collection \n" +
                            "(default: 'inventory')")
        parser.add_argument("-l", "--loglevel", nargs="?", type=str,
                            default="INFO",
                            help="logging level \n(default: 'INFO')")
        parser.add_argument("-o", "--offset", nargs="?", type=str,
                            default="largest",
                            help="where to start reading" +
                                 " - use 'smallest' for start \n" +
                                 "(default: 'largest')")
        self.args = parser.parse_args()

    def read(self):
        for kafka_msg in self.consumer:
            msg = json.loads(kafka_msg.value.decode())
            self.add_stats(msg)

    def add_stats(self, msg):
        host_ip = msg['hostIp']
        search = {
            'environment': self.args.env,
            'type': 'host',
            'ip_address': host_ip
        }
        host = self.inv.find_items(search, get_single=True)
        if not host:
            self.log.error('could not find host with ip address=' + host_ip)
            return
        host_id = host['id']
        search = {
            'environment': self.args.env,
            'type': 'vedge',
            'host': host_id
        }
        vedge = self.inv.find_items(search, get_single=True)
        if not vedge:
            self.log.error('could not find vEdge for host: ' + host_id)
            return
        self.log.info('setting VPP stats for vEdge of host: ' + host_id)
        self.add_stats_for_object(vedge, msg)

    def add_stats_for_object(self, o, msg):
        msg['type'] = 'vedge_flows'
        msg['environment'] = self.args.env
        msg['object_type'] = o['type']
        msg['object_id'] = o['id']
        time_seconds = int(msg['averageArrivalNanoSeconds'] / 1000000000)
        sample_time = time.gmtime(time_seconds)
        msg['sample_time'] = time.strftime("%Y-%m-%dT%H:%M:%SZ", sample_time)
        # find instances between which the flow happens
        # to find the instance, find the related vNIC first
        msg['source'] = self.find_instance_for_stat('source', msg)
        msg['destination'] = self.find_instance_for_stat('destination', msg)
        self.stats.insert_one(msg)

    def find_instance_for_stat(self, direction, msg):
        search_by_mac_address = 'sourceMacAddress' in msg
        value_attr = 'MacAddress' if search_by_mac_address else 'IpAddress'
        value_to_search = msg[direction + value_attr]
        attr = 'mac_address' if search_by_mac_address else 'ip_address'
        search = {
            'environment': self.args.env,
            'type': 'vnic',
            attr: value_to_search
        }
        vnic = self.inv.find_items(search, get_single=True)
        if not vnic:
            self.log.error('failed to find vNIC for ' +
                           attr + '=' + value_to_search)
            return 'Unknown'
        # now find the instance name from the vnic name
        name_path = vnic['name_path'].split('/')
        instance_name = name_path[8]
        return instance_name

if __name__ == '__main__':
    stats_consumer = StatsConsumer()
    stats_consumer.read()