From c36f82e76df1f2f506f770935093828d3d573e6b Mon Sep 17 00:00:00 2001 From: earrage Date: Wed, 10 Oct 2018 11:54:51 -0700 Subject: Improve data ingestion reliability and functionality - Modify deployment namespace to clover-system and account for cassandra moving to the clover-system namespace - Increase k8s compute resource assigned to cassandra to deal with performance issues - Add additional fields (user-agent, request/response size, status codes) to span schema definition and modify primary keys - Improve exception handling to prevent collect process from crashing - Minor changes to support tracing/monitoring with Istio 1.0 - Inhibit logging for debug messages - Increase time back and number of traces to fetch in each sampling interval to deal with Jaeger REST interface returning trace data out of order under load (tested to 300 conn/sec; 12K connections currently) - Move trace insert into batch mode to cassandra - Read visibility services to analyze from redis rather than defaults (cloverctl, UI or clover-controller REST will set) - Remove local directory copies in docker build, as image is based on base clover container Change-Id: Ibae98ef5057e52a6eeddd9ebbcfaeb644caec36c Signed-off-by: earrage --- clover/collector/process/collect.py | 115 +++++++++++++++++++++++------------- 1 file changed, 73 insertions(+), 42 deletions(-) (limited to 'clover/collector/process/collect.py') diff --git a/clover/collector/process/collect.py b/clover/collector/process/collect.py index d8beb49..3d9df8a 100644 --- a/clover/collector/process/collect.py +++ b/clover/collector/process/collect.py @@ -16,19 +16,25 @@ import argparse import logging import ast -TRACING_SERVICES = ['istio-ingress'] TRACING_PORT = "16686" MONITORING_PORT = "9090" CASSANDRA_PORT = 9042 # Provide as integer MONITORING_HOST = "prometheus.istio-system" -TRACING_HOST = "jaeger-deployment.istio-system" -CASSANDRA_HOSTS = ['cassandra.default'] +TRACING_HOST = "tracing.istio-system" +CASSANDRA_HOSTS = ['cassandra.clover-system'] class Collector: def __init__(self, t_port, t_host, m_port, m_host, c_port, c_hosts): - logging.basicConfig(filename='collector.log', level=logging.DEBUG) + + # logging.basicConfig(filename='collector.log', level=logging.DEBUG) + logging.basicConfig(filename='collector.log', level=logging.ERROR) + # logging.getLogger("requests").setLevel(logging.DEBUG) + logging.getLogger("requests").setLevel(logging.ERROR) + # logging.getLogger("urllib3").setLevel(logging.DEBUG) + logging.getLogger("urllib3").setLevel(logging.ERROR) + try: self.t = Tracing(t_host, t_port, '', False) monitoring_url = "http://{}:{}".format(m_host, m_port) @@ -40,63 +46,89 @@ class Collector: logging.debug(e) # Toplevel tracing retrieval and batch insert - def get_tracing(self, services, time_back=20): - self.c.set_batch() - for service in services: - traces = self.t.getTraces(service, time_back) - try: - self.set_tracing(traces) - except Exception as e: - logging.debug(e) - self.c.execute_batch() + def get_tracing(self, time_back=300): + try: + services = self.r.get_services() + for service in services: + traces = self.t.getTraces(service.replace("_", "-"), time_back, + '20000') + try: + self.set_tracing(traces) + except Exception as e: + logging.debug(e) + + # Update list of available services from tracing + services = self.t.getServices() + self.r.set_tracing_services(services) + except Exception as e: + logging.debug(e) # Insert to cassandra visibility traces and spans tables def set_tracing(self, trace): for traces in trace['data']: + self.c.set_batch() for spans in traces['spans']: + try: span = {} span['spanID'] = spans['spanID'] span['duration'] = spans['duration'] span['startTime'] = spans['startTime'] span['operationName'] = spans['operationName'] + tag = {} for tags in spans['tags']: tag[tags['key']] = tags['value'] - self.c.insert_tracing('spans', traces['traceID'], - span, tag) + self.c.insert_span(traces['traceID'], span, tag) + except Exception as e: + logging.debug("spans loop") + logging.debug(e) + process_list = [] for p in traces['processes']: process_list.append(p) service_names = [] for pname in process_list: service_names.append(traces['processes'][pname]['serviceName']) - self.c.insert_trace(traces['traceID'], service_names) + try: + self.c.insert_trace(traces['traceID'], service_names) + self.c.execute_batch() + except Exception as e: + logging.debug(e) # Insert to cassandra visibility metrics table def get_monitoring(self): - # Fetch collector service/metric lists from redis - service_names = self.r.get_services() - metric_prefixes, metric_suffixes = self.r.get_metrics() - - self.c.set_batch() - for sname in service_names: - for prefix in metric_prefixes: - for suffix in metric_suffixes: - try: - metric_name = prefix + sname + suffix - query_params = { - "type": "instant", - "query": metric_name - } - data = self.m.query(query_params) - m_value = data['data']['result'][0]['value'][1] - m_time = data['data']['result'][0]['value'][0] - mn = data['data']['result'][0]['metric']['__name__'] - self.c.insert_metric(mn, m_value, str(m_time), sname) - except Exception as e: - logging.debug(e) - self.c.execute_batch() + try: + # Fetch collector service/metric lists from redis + service_names = self.r.get_services() + metric_prefixes, metric_suffixes = self.r.get_metrics() + + self.c.set_batch() + for sname in service_names: + for prefix in metric_prefixes: + for suffix in metric_suffixes: + try: + metric_name = prefix + sname + suffix + query_params = { + "type": "instant", + "query": metric_name + } + data = self.m.query(query_params) + m_value = data['data']['result'][0]['value'][1] + m_time = data['data']['result'][0]['value'][0] + mn = data[ + 'data']['result'][0]['metric']['__name__'] + self.c.insert_metric( + mn, m_value, str(m_time), sname) + + # Add to redis temporarily + self.r.r.set(mn, m_value) + + except Exception as e: + logging.debug(e) + self.c.execute_batch() + except Exception as e: + logging.debug(e) # TODO add batch retrieval for monitoring metrics # query_range_param = { @@ -124,11 +156,13 @@ def main(args): loop = True while loop: try: - c.get_tracing(args['t_services']) + c.get_tracing() c.get_monitoring() time.sleep(int(args['sinterval'])) except KeyboardInterrupt: loop = False + except Exception as e: + logging.debug(e) if __name__ == '__main__': @@ -154,9 +188,6 @@ if __name__ == '__main__': parser.add_argument( '-c_port', default=CASSANDRA_PORT, help='Port to access Cassandra cluster') - parser.add_argument( - '-t_services', default=TRACING_SERVICES, - help='Collect services on this list of services') args, unknown = parser.parse_known_args() print(main(vars(args))) -- cgit 1.2.3-korg