diff options
author | earrage <eddie.arrage@huawei.com> | 2018-10-10 11:54:51 -0700 |
---|---|---|
committer | earrage <eddie.arrage@huawei.com> | 2018-10-10 11:59:49 -0700 |
commit | c36f82e76df1f2f506f770935093828d3d573e6b (patch) | |
tree | e5f8dadb2359c5fc5a68737f21f2e140c1763dcb /clover/collector/process/collect.py | |
parent | e716285dfe01e2373984550495fd2cf02dbf959d (diff) |
Improve data ingestion reliability and functionality
- Modify deployment namespace to clover-system and account
for cassandra moving to the clover-system namespace
- Increase k8s compute resource assigned to cassandra to deal
with performance issues
- Add additional fields (user-agent, request/response size,
status codes) to span schema definition and modify primary keys
- Improve exception handling to prevent collect process from
crashing
- Minor changes to support tracing/monitoring with Istio 1.0
- Inhibit logging for debug messages
- Increase time back and number of traces to fetch in
each sampling interval to deal with Jaeger REST interface
returning trace data out of order under load
(tested to 300 conn/sec; 12K connections currently)
- Move trace insert into batch mode to cassandra
- Read visibility services to analyze from redis rather than
defaults (cloverctl, UI or clover-controller REST will set)
- Remove local directory copies in docker build, as image is
based on base clover container
Change-Id: Ibae98ef5057e52a6eeddd9ebbcfaeb644caec36c
Signed-off-by: earrage <eddie.arrage@huawei.com>
Diffstat (limited to 'clover/collector/process/collect.py')
-rw-r--r-- | clover/collector/process/collect.py | 115 |
1 files changed, 73 insertions, 42 deletions
diff --git a/clover/collector/process/collect.py b/clover/collector/process/collect.py index d8beb49..3d9df8a 100644 --- a/clover/collector/process/collect.py +++ b/clover/collector/process/collect.py @@ -16,19 +16,25 @@ import argparse import logging import ast -TRACING_SERVICES = ['istio-ingress'] TRACING_PORT = "16686" MONITORING_PORT = "9090" CASSANDRA_PORT = 9042 # Provide as integer MONITORING_HOST = "prometheus.istio-system" -TRACING_HOST = "jaeger-deployment.istio-system" -CASSANDRA_HOSTS = ['cassandra.default'] +TRACING_HOST = "tracing.istio-system" +CASSANDRA_HOSTS = ['cassandra.clover-system'] class Collector: def __init__(self, t_port, t_host, m_port, m_host, c_port, c_hosts): - logging.basicConfig(filename='collector.log', level=logging.DEBUG) + + # logging.basicConfig(filename='collector.log', level=logging.DEBUG) + logging.basicConfig(filename='collector.log', level=logging.ERROR) + # logging.getLogger("requests").setLevel(logging.DEBUG) + logging.getLogger("requests").setLevel(logging.ERROR) + # logging.getLogger("urllib3").setLevel(logging.DEBUG) + logging.getLogger("urllib3").setLevel(logging.ERROR) + try: self.t = Tracing(t_host, t_port, '', False) monitoring_url = "http://{}:{}".format(m_host, m_port) @@ -40,63 +46,89 @@ class Collector: logging.debug(e) # Toplevel tracing retrieval and batch insert - def get_tracing(self, services, time_back=20): - self.c.set_batch() - for service in services: - traces = self.t.getTraces(service, time_back) - try: - self.set_tracing(traces) - except Exception as e: - logging.debug(e) - self.c.execute_batch() + def get_tracing(self, time_back=300): + try: + services = self.r.get_services() + for service in services: + traces = self.t.getTraces(service.replace("_", "-"), time_back, + '20000') + try: + self.set_tracing(traces) + except Exception as e: + logging.debug(e) + + # Update list of available services from tracing + services = self.t.getServices() + self.r.set_tracing_services(services) + except Exception as e: + logging.debug(e) # Insert to cassandra visibility traces and spans tables def set_tracing(self, trace): for traces in trace['data']: + self.c.set_batch() for spans in traces['spans']: + try: span = {} span['spanID'] = spans['spanID'] span['duration'] = spans['duration'] span['startTime'] = spans['startTime'] span['operationName'] = spans['operationName'] + tag = {} for tags in spans['tags']: tag[tags['key']] = tags['value'] - self.c.insert_tracing('spans', traces['traceID'], - span, tag) + self.c.insert_span(traces['traceID'], span, tag) + except Exception as e: + logging.debug("spans loop") + logging.debug(e) + process_list = [] for p in traces['processes']: process_list.append(p) service_names = [] for pname in process_list: service_names.append(traces['processes'][pname]['serviceName']) - self.c.insert_trace(traces['traceID'], service_names) + try: + self.c.insert_trace(traces['traceID'], service_names) + self.c.execute_batch() + except Exception as e: + logging.debug(e) # Insert to cassandra visibility metrics table def get_monitoring(self): - # Fetch collector service/metric lists from redis - service_names = self.r.get_services() - metric_prefixes, metric_suffixes = self.r.get_metrics() - - self.c.set_batch() - for sname in service_names: - for prefix in metric_prefixes: - for suffix in metric_suffixes: - try: - metric_name = prefix + sname + suffix - query_params = { - "type": "instant", - "query": metric_name - } - data = self.m.query(query_params) - m_value = data['data']['result'][0]['value'][1] - m_time = data['data']['result'][0]['value'][0] - mn = data['data']['result'][0]['metric']['__name__'] - self.c.insert_metric(mn, m_value, str(m_time), sname) - except Exception as e: - logging.debug(e) - self.c.execute_batch() + try: + # Fetch collector service/metric lists from redis + service_names = self.r.get_services() + metric_prefixes, metric_suffixes = self.r.get_metrics() + + self.c.set_batch() + for sname in service_names: + for prefix in metric_prefixes: + for suffix in metric_suffixes: + try: + metric_name = prefix + sname + suffix + query_params = { + "type": "instant", + "query": metric_name + } + data = self.m.query(query_params) + m_value = data['data']['result'][0]['value'][1] + m_time = data['data']['result'][0]['value'][0] + mn = data[ + 'data']['result'][0]['metric']['__name__'] + self.c.insert_metric( + mn, m_value, str(m_time), sname) + + # Add to redis temporarily + self.r.r.set(mn, m_value) + + except Exception as e: + logging.debug(e) + self.c.execute_batch() + except Exception as e: + logging.debug(e) # TODO add batch retrieval for monitoring metrics # query_range_param = { @@ -124,11 +156,13 @@ def main(args): loop = True while loop: try: - c.get_tracing(args['t_services']) + c.get_tracing() c.get_monitoring() time.sleep(int(args['sinterval'])) except KeyboardInterrupt: loop = False + except Exception as e: + logging.debug(e) if __name__ == '__main__': @@ -154,9 +188,6 @@ if __name__ == '__main__': parser.add_argument( '-c_port', default=CASSANDRA_PORT, help='Port to access Cassandra cluster') - parser.add_argument( - '-t_services', default=TRACING_SERVICES, - help='Collect services on this list of services') args, unknown = parser.parse_known_args() print(main(vars(args))) |