summaryrefslogtreecommitdiffstats
path: root/clover/collector/process/collect.py
diff options
context:
space:
mode:
Diffstat (limited to 'clover/collector/process/collect.py')
-rw-r--r--clover/collector/process/collect.py115
1 files changed, 73 insertions, 42 deletions
diff --git a/clover/collector/process/collect.py b/clover/collector/process/collect.py
index d8beb49..3d9df8a 100644
--- a/clover/collector/process/collect.py
+++ b/clover/collector/process/collect.py
@@ -16,19 +16,25 @@ import argparse
import logging
import ast
-TRACING_SERVICES = ['istio-ingress']
TRACING_PORT = "16686"
MONITORING_PORT = "9090"
CASSANDRA_PORT = 9042 # Provide as integer
MONITORING_HOST = "prometheus.istio-system"
-TRACING_HOST = "jaeger-deployment.istio-system"
-CASSANDRA_HOSTS = ['cassandra.default']
+TRACING_HOST = "tracing.istio-system"
+CASSANDRA_HOSTS = ['cassandra.clover-system']
class Collector:
def __init__(self, t_port, t_host, m_port, m_host, c_port, c_hosts):
- logging.basicConfig(filename='collector.log', level=logging.DEBUG)
+
+ # logging.basicConfig(filename='collector.log', level=logging.DEBUG)
+ logging.basicConfig(filename='collector.log', level=logging.ERROR)
+ # logging.getLogger("requests").setLevel(logging.DEBUG)
+ logging.getLogger("requests").setLevel(logging.ERROR)
+ # logging.getLogger("urllib3").setLevel(logging.DEBUG)
+ logging.getLogger("urllib3").setLevel(logging.ERROR)
+
try:
self.t = Tracing(t_host, t_port, '', False)
monitoring_url = "http://{}:{}".format(m_host, m_port)
@@ -40,63 +46,89 @@ class Collector:
logging.debug(e)
# Toplevel tracing retrieval and batch insert
- def get_tracing(self, services, time_back=20):
- self.c.set_batch()
- for service in services:
- traces = self.t.getTraces(service, time_back)
- try:
- self.set_tracing(traces)
- except Exception as e:
- logging.debug(e)
- self.c.execute_batch()
+ def get_tracing(self, time_back=300):
+ try:
+ services = self.r.get_services()
+ for service in services:
+ traces = self.t.getTraces(service.replace("_", "-"), time_back,
+ '20000')
+ try:
+ self.set_tracing(traces)
+ except Exception as e:
+ logging.debug(e)
+
+ # Update list of available services from tracing
+ services = self.t.getServices()
+ self.r.set_tracing_services(services)
+ except Exception as e:
+ logging.debug(e)
# Insert to cassandra visibility traces and spans tables
def set_tracing(self, trace):
for traces in trace['data']:
+ self.c.set_batch()
for spans in traces['spans']:
+ try:
span = {}
span['spanID'] = spans['spanID']
span['duration'] = spans['duration']
span['startTime'] = spans['startTime']
span['operationName'] = spans['operationName']
+
tag = {}
for tags in spans['tags']:
tag[tags['key']] = tags['value']
- self.c.insert_tracing('spans', traces['traceID'],
- span, tag)
+ self.c.insert_span(traces['traceID'], span, tag)
+ except Exception as e:
+ logging.debug("spans loop")
+ logging.debug(e)
+
process_list = []
for p in traces['processes']:
process_list.append(p)
service_names = []
for pname in process_list:
service_names.append(traces['processes'][pname]['serviceName'])
- self.c.insert_trace(traces['traceID'], service_names)
+ try:
+ self.c.insert_trace(traces['traceID'], service_names)
+ self.c.execute_batch()
+ except Exception as e:
+ logging.debug(e)
# Insert to cassandra visibility metrics table
def get_monitoring(self):
- # Fetch collector service/metric lists from redis
- service_names = self.r.get_services()
- metric_prefixes, metric_suffixes = self.r.get_metrics()
-
- self.c.set_batch()
- for sname in service_names:
- for prefix in metric_prefixes:
- for suffix in metric_suffixes:
- try:
- metric_name = prefix + sname + suffix
- query_params = {
- "type": "instant",
- "query": metric_name
- }
- data = self.m.query(query_params)
- m_value = data['data']['result'][0]['value'][1]
- m_time = data['data']['result'][0]['value'][0]
- mn = data['data']['result'][0]['metric']['__name__']
- self.c.insert_metric(mn, m_value, str(m_time), sname)
- except Exception as e:
- logging.debug(e)
- self.c.execute_batch()
+ try:
+ # Fetch collector service/metric lists from redis
+ service_names = self.r.get_services()
+ metric_prefixes, metric_suffixes = self.r.get_metrics()
+
+ self.c.set_batch()
+ for sname in service_names:
+ for prefix in metric_prefixes:
+ for suffix in metric_suffixes:
+ try:
+ metric_name = prefix + sname + suffix
+ query_params = {
+ "type": "instant",
+ "query": metric_name
+ }
+ data = self.m.query(query_params)
+ m_value = data['data']['result'][0]['value'][1]
+ m_time = data['data']['result'][0]['value'][0]
+ mn = data[
+ 'data']['result'][0]['metric']['__name__']
+ self.c.insert_metric(
+ mn, m_value, str(m_time), sname)
+
+ # Add to redis temporarily
+ self.r.r.set(mn, m_value)
+
+ except Exception as e:
+ logging.debug(e)
+ self.c.execute_batch()
+ except Exception as e:
+ logging.debug(e)
# TODO add batch retrieval for monitoring metrics
# query_range_param = {
@@ -124,11 +156,13 @@ def main(args):
loop = True
while loop:
try:
- c.get_tracing(args['t_services'])
+ c.get_tracing()
c.get_monitoring()
time.sleep(int(args['sinterval']))
except KeyboardInterrupt:
loop = False
+ except Exception as e:
+ logging.debug(e)
if __name__ == '__main__':
@@ -154,9 +188,6 @@ if __name__ == '__main__':
parser.add_argument(
'-c_port', default=CASSANDRA_PORT,
help='Port to access Cassandra cluster')
- parser.add_argument(
- '-t_services', default=TRACING_SERVICES,
- help='Collect services on this list of services')
args, unknown = parser.parse_known_args()
print(main(vars(args)))