# Data-Query
This notebook is provided as tool to extract data, based on data-model from thoth, from multiple sources.

## Traget Sources
Phase-1:
1. Prometheus
2. Elasticsearch

#########################################################################

Copyright 2021 Spirent Communications

 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 
#########################################################################

### Import required packages

In [43]:
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import numpy as np
from IPython.display import display

import datetime
import time
import requests

from pprint import pprint
import json
from datetime import datetime, timedelta

#from elasticsearch import Elasticsearch
#from elasticsearch_dsl import Search
#from elasticsearch.connection import create_ssl_context
import ssl
import urllib3

### Sources

### Helper Functions

In [15]:
#function to make DF out of query json

def convert_to_df(res_json):

    data_list = res_json['data']['result']
    res_df = pd.DataFrame()
    if not data_list:
        return res_df

    # making colums
    headers = data_list[0]
    for data in data_list:
        metrics = data['metric']
        for metric in metrics.keys():
            res_df[metric] = np.nan
        res_df['value'] = 0
    
    # filling the df
    for data in data_list:
        metrics = data['metric']
        metrics['value'] = data['value'][-1]
        res_df = res_df.append(metrics, ignore_index=True)      

    return res_df

def convert_to_df_range(res_json):

    data_list = res_json['data']['result']
    res_df = pd.DataFrame()
    if not data_list:
        return res_df

    # filling the df
    for data in data_list:
        metrics = data['metric']
        values = np.array(data['values'])
        for time, value in values:
            metrics['timestamp'] = time
            metrics['value'] = value
            res_df = res_df.append(metrics, ignore_index=True)      

    return res_df

In [9]:
# functions to query

def convert_to_timestamp(s):
    return time.mktime(datetime.strptime(s, "%Y-%m-%d %H:%M:%S").timetuple())

def query_current(params={}):
    # input: params
    # type: dict
    # Example: {'query': 'container_cpu_user_seconds_total'}   
    # Output: dict, loaded json response of the query

    res = requests.get(PROMETHEUS + '/api/v1/query', 
                       params=params)
    return json.loads(res.text)


def query_range(start, end, params={}, steps = '30s'):
    # input: params
    # type: dict
    # Example: {'query': 'container_cpu_user_seconds_total'}
    
    # Output: dict, loaded json response of the query
    params["start"] = convert_to_timestamp(start)
    params["end"] = convert_to_timestamp(end)
    params["step"] = steps

    # print(params)

    res = requests.get(PROMETHEUS + '/api/v1/query_range', 
                       params=params,
                       )

    return json.loads(res.text)

In [10]:
# Elasticsearch [WIP]
# https://github.com/elastic/elasticsearch-dsl-py 
def query_es_range(start, end, indx, cat, titl, desc):
    es = Elasticsearch([ELASTICSEARCH])
    # Build a DSL Search object on the 'commits' index, 'summary' document type
    request = elasticsearch_dsl.Search(using=es, index='commits',
                                       doc_type='summary')
    # Run the Search, using the scan interface to get all resuls
    response = request.scan() # or execute()?

### Data Extractions as Dataframes

In [47]:
#https://github.com/Naugrimm/promql-examples
def cpu_core_utilization(start=None, end=None, node=None, steps='15s', csv=None, verbose=False):
    # If total is not available:
    #params = {'query' : "100 * (1 - sum(collectd_cpu{type='idle'}) by (cpu) / sum(collectd_cpu{exported_instance='" + node + "'}) by (cpu))"}
    params = {'query': "100 * (1 - sum(collectd_cpu_total{type='idle'}) by (exported_instance) / sum(collectd_cpu_total) by (exported_instance))"}
    target_cpu_usage_range = query_range(start, end, params, steps)
    current_cpu = query_current(params)
    df = convert_to_df_range(target_cpu_usage_range)
    display(df)
    return df

In [66]:
# Interface Dropped (both type 1 and 2, i.e rx and tx)
#TODO: Change this to separate functions later
def interface_dropped(start=None, end=None, node=None, steps='15s', csv=None, verbose=False):
    drop_dfs = []
    if csv is not None:
        df = pd.read_csv(csv)
        df_0 = df #TODO: Change this
        df_1 = df #TODO: Change this
    else:
        if start is None or end is None or node is None:
            return "Start, end and Node name required when fetching from prometheus"
        
        params = {'query' : "collectd_interface_if_dropped_0{exported_instance='" + node + "'}"}

        interface_dropped_0 = query_range(start, end, params, steps)
        df_0 = convert_to_df_range(interface_dropped_0)
        
        params = {'query' : "collectd_interface_if_dropped_1{exported_instance='" + node + "'}"}
        interface_dropped_1 = query_range(start, end, params, steps)
        df_1 = convert_to_df_range(interface_dropped_1)

        #df_0 : interfaces_dropped_0_df
        #df_0 = df_0.drop(['__name__', 'instance', 'job'], axis = 1)
        drop_dfs.append(df_0)

        #df_1 : interfaces_dropped_1_df
        #df_1 = df_1.drop(['__name__', 'instance', 'job'], axis = 1)
        drop_dfs.append(df_1)

    return drop_dfs

def virt_interface_dropped(start=None, end=None, node=None, steps='15s', csv=None, verbose=False):
    drop_dfs = []
    if csv is not None:
        df = pd.read_csv(csv)
        df_0 = df #TODO: Change this
        df_1 = df #TODO: Change this
    else:
        if start is None or end is None or node is None:
            return "Start, end and Node name required when fetching from prometheus"
        
        params = {'query' : "collectd_virt_if_dropped_0{exported_instance='" + node + "'}"}
          
        interface_dropped_0 = query_range(start, end, params, steps)
        df_0 = convert_to_df_range(interface_dropped_0)
        
        params = {'query' : "collectd_virt_if_dropped_0{exported_instance='" + node + "'}"}
        interface_dropped_1 = query_range(start, end, params, steps)
        df_1 = convert_to_df_range(interface_dropped_1)

        #df_0 : interfaces_dropped_0_df
        df_0 = df_0.drop(['instance', 'job'], axis = 1)
        drop_dfs.append(df_0)

        #df_1 : interfaces_dropped_1_df
        df_1 = df_1.drop(['instance', 'job'], axis = 1)
        drop_dfs.append(df_1)

    return drop_dfs


# Interface Errors (both type 1 and 2, i.e rx and tx)
#TODO: Change this to separate functions later
def interface_errors(start=None, end=None, node=None, steps='15s', csv=None, verbose=False):
    error_dfs = []
    if csv is not None:
        df = pd.read_csv(csv)
        df_0 = df #TODO: Change this
        df_1 = df #TODO: Change this
    else:
        if start is None or end is None or node is None:
            return "Start, end and Node name required when fetching from prometheus"
        
        params = {'query' : "collectd_interface_if_errors_0_total{exported_instance='" + node + "'}"}
        interfaces_errors_0 = query_range(start, end, params, steps)
        df_0 = convert_to_df_range(interfaces_errors_0)
        
        params = {'query' : "collectd_interface_if_errors_1_total{exported_instance='" + node + "'}"}
        interface_errors_1 = query_range(start, end, params, steps)
        df_1 = convert_to_df_range(interface_errors_1)

        
        #df_0 : interfaces_errors_0_df
        #df_0 = df_0.drop(['__name__', 'instance', 'job'], axis = 1)
        error_dfs.append(df_0)

        #df_1 : interfaces_dropped_1_df
        #df_1 = df_1.drop(['__name__', 'instance', 'job'], axis = 1)
        error_dfs.append(df_1)

    return error_dfs

In [71]:
def get_memory_usage(start=None, end=None, node=None, steps='15s', csv=None, verbose=False):
    
    if csv is not None:
        df = pd.read_csv(csv)
    else:
        if start is None or end is None or node is None:
            return "Start, end and Node name required when fetching from prometheus"
        
        params = {'query' : "collectd_memory{exported_instance='" + node + "'} / (1024*1024*1024) "}        
        target_memory_usage_range = query_range(start, end, params, steps)
        df = convert_to_df_range(target_memory_usage_range)    
        #df = df.drop(['instance', 'job'], axis = 1)
    return df

def get_virt_memory_usage(start=None, end=None, node=None, steps='15s', csv=None, verbose=False):
    # Example value: collectd_virt_memory{exported_instance="instance-00001a33", 
    # instance="localhost:9103",job="promc",virt="total"}
    if csv is not None:
        df = pd.read_csv(csv)
    else:
        if start is None or end is None or node is None:
            return "Start, end and Node name required when fetching from prometheus"
        
        params = {'query' : "collectd_virt_memory{exported_instance='" + node + "'} / (1024*1024*1024) "}        
        target_memory_usage_range = query_range(start, end, params, steps)
        df = convert_to_df_range(target_memory_usage_range)
        
        #df = df.drop(['instance', 'job'], axis = 1)
    return df

### Main Extraction

In [73]:
def extract(timestamp, node=None, node_phy=True):
    virt = True
    # TODO: If node is provided, we may have to do some validations
    # If its a valid-node node of not.
    end = datetime.strptime(timestamp.split(',')[0], "%Y-%m-%d %H:%M:%S")
    start = end - timedelta(days=1)
    
    start = str(start)
    end = str(end)
    steps = '60s'

    print("Starting Extraction from",start,"to",end,'\n\n')
    #cpu
    print("===== CPU Data =====\n")
    cpu_utilization = cpu_core_utilization(start, end, steps=steps)
    display(cpu_utilization)
      
   
    #interface analysis
    print("=====Interfaces (including libvirt) Dropped / Errors=====\n")
    dropped_interfaces = interface_dropped(start=start, end=end, node=node, steps=steps)
    for drop_if in dropped_interfaces:
        display(drop_if)
    err_interfaces = interface_errors(start=start, end=end, node=node, steps=steps)
    for err_if in err_interfaces:
        display(err_if)
    print(err_interfaces)
    #vdropped_interfaces = virt_interface_dropped(start,end,node,steps)
    
       
    #Memory Analysis:
    print("=====Memory Data =====\n")
    mem = get_memory_usage(start, end, node, steps)
    display(mem)
    vmem = get_virt_memory_usage(start, end, node, steps)
    display(vmem)

In [74]:
PROMETHEUS = 'http://10.10.180.20:9090/'
ELASTICSEARCH = 'http://192.168.2.233:9200/'

In [75]:
# Call to extract any Node or all nodes...
# Default start is 1 days before the timestamp provided.
# Please modify in the extract function if you want anything lesser or more.
extract('2022-04-12 06:00:00', 'pod18-node4')
#extract('2021-09-03 18:45:00', node="MY-NODE-1")

Starting Extraction from 2022-04-11 06:00:00 to 2022-04-12 06:00:00 


===== CPU Data =====



Unnamed: 0,exported_instance,timestamp,value
0,pod18-node4,1649649660,1.8592150264858165
1,pod18-node4,1649649720,1.859213509941382
2,pod18-node4,1649649780,1.8592120502819665
3,pod18-node4,1649649840,1.8592107627314003
4,pod18-node4,1649649900,1.8592092108782765
...,...,...,...
2455,pod18-node5,1649723160,1.0958105540852525
2456,pod18-node5,1649723220,1.0958088368217433
2457,pod18-node5,1649723280,1.095807067448351
2458,pod18-node5,1649723340,1.0958053357542963


Unnamed: 0,exported_instance,timestamp,value
0,pod18-node4,1649649660,1.8592150264858165
1,pod18-node4,1649649720,1.859213509941382
2,pod18-node4,1649649780,1.8592120502819665
3,pod18-node4,1649649840,1.8592107627314003
4,pod18-node4,1649649900,1.8592092108782765
...,...,...,...
2455,pod18-node5,1649723160,1.0958105540852525
2456,pod18-node5,1649723220,1.0958088368217433
2457,pod18-node5,1649723280,1.095807067448351
2458,pod18-node5,1649723340,1.0958053357542963


=====Interfaces (including libvirt) Dropped / Errors=====



[Empty DataFrame
Columns: []
Index: [], Empty DataFrame
Columns: []
Index: []]
=====Memory Data =====

