diff options
author | Eddie Arrage <eddie.arrage@huawei.com> | 2018-05-09 18:33:55 +0000 |
---|---|---|
committer | Eddie Arrage <eddie.arrage@huawei.com> | 2018-06-12 06:43:48 +0000 |
commit | dbece18d19c3977019c6727fcbe7a436031666fe (patch) | |
tree | 8eda38ba5fc07f5afc82904a610dc0dad859a4a4 /clover/collector/process/collect.py | |
parent | 115d3c9ba4de194534cdf0be827c16e04e49951b (diff) |
Initial commit for Clover Collector
- Added a container named clover-collector using clover
container as a base with build script
- GRPC server to manage collector process
- Cassandra DB client interface to initialize visibility keyspace
- Init messaging adds table schemas for tracing - traces & spans
- Adds table for monitoring - metrics
- Does not implement Cassandra server but developed using
public Cassandra docker container
- Collector process in simple loop that periodically fetches
traces and monitoring data and inserts to Cassandra - not optimized
for batch retrieval yet for monitoring
- CLI interface added to collector process and used
by GRPC server for configuration
- Simple GRPC client script to test GRPC server and start/stop
of collector process
- Collector process can be configured with access for tracing,
monitoring and Cassandra
- Added a return value in monitoring query method
- Added ability to truncate tracing, metrics and spans tables
in cql
- Added cql prepared statements and batch insert for metrics
and spans
- Align cql connection to cql deployment within k8s
- Fix issue with cql host list using ast and collect process
args with background argument
- Added redis interface to accept service/metric list
externally for monitoring (will work in conjunction
with clover-controller)
- Use k8s DNS names and default ports for monitoring, tracing
and cassandra
- Added yaml manifest renderer/template for collector
Change-Id: I3e4353e28844c4ce9c185ff4638012b66c7fff67
Signed-off-by: Eddie Arrage <eddie.arrage@huawei.com>
Diffstat (limited to 'clover/collector/process/collect.py')
-rw-r--r-- | clover/collector/process/collect.py | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/clover/collector/process/collect.py b/clover/collector/process/collect.py new file mode 100644 index 0000000..d8beb49 --- /dev/null +++ b/clover/collector/process/collect.py @@ -0,0 +1,162 @@ +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +from clover.tracing.tracing import Tracing +from clover.monitoring.monitoring import Monitoring +from clover.collector.db.cassops import CassandraOps +from clover.collector.db.redisops import RedisOps + +# import pprint +import time +import argparse +import logging +import ast + +TRACING_SERVICES = ['istio-ingress'] +TRACING_PORT = "16686" +MONITORING_PORT = "9090" +CASSANDRA_PORT = 9042 # Provide as integer +MONITORING_HOST = "prometheus.istio-system" +TRACING_HOST = "jaeger-deployment.istio-system" +CASSANDRA_HOSTS = ['cassandra.default'] + + +class Collector: + + def __init__(self, t_port, t_host, m_port, m_host, c_port, c_hosts): + logging.basicConfig(filename='collector.log', level=logging.DEBUG) + try: + self.t = Tracing(t_host, t_port, '', False) + monitoring_url = "http://{}:{}".format(m_host, m_port) + self.m = Monitoring(monitoring_url) + self.c = CassandraOps(c_hosts, int(c_port)) + self.c.set_prepared() + self.r = RedisOps() + except Exception as e: + logging.debug(e) + + # Toplevel tracing retrieval and batch insert + def get_tracing(self, services, time_back=20): + self.c.set_batch() + for service in services: + traces = self.t.getTraces(service, time_back) + try: + self.set_tracing(traces) + except Exception as e: + logging.debug(e) + self.c.execute_batch() + + # Insert to cassandra visibility traces and spans tables + def set_tracing(self, trace): + for traces in trace['data']: + for spans in traces['spans']: + span = {} + span['spanID'] = spans['spanID'] + span['duration'] = spans['duration'] + span['startTime'] = spans['startTime'] + span['operationName'] = spans['operationName'] + tag = {} + for tags in spans['tags']: + tag[tags['key']] = tags['value'] + self.c.insert_tracing('spans', traces['traceID'], + span, tag) + process_list = [] + for p in traces['processes']: + process_list.append(p) + service_names = [] + for pname in process_list: + service_names.append(traces['processes'][pname]['serviceName']) + self.c.insert_trace(traces['traceID'], service_names) + + # Insert to cassandra visibility metrics table + def get_monitoring(self): + + # Fetch collector service/metric lists from redis + service_names = self.r.get_services() + metric_prefixes, metric_suffixes = self.r.get_metrics() + + self.c.set_batch() + for sname in service_names: + for prefix in metric_prefixes: + for suffix in metric_suffixes: + try: + metric_name = prefix + sname + suffix + query_params = { + "type": "instant", + "query": metric_name + } + data = self.m.query(query_params) + m_value = data['data']['result'][0]['value'][1] + m_time = data['data']['result'][0]['value'][0] + mn = data['data']['result'][0]['metric']['__name__'] + self.c.insert_metric(mn, m_value, str(m_time), sname) + except Exception as e: + logging.debug(e) + self.c.execute_batch() + + # TODO add batch retrieval for monitoring metrics + # query_range_param = { + # "type": "range", + # "query": "tbd", + # "start": "60m", + # "end": "5m", + # "step": "30s" + # } + # data = self.m.query(query_range_param) + # pp = pprint.PrettyPrinter(indent=2) + # pp.pprint(data) + + +def main(args): + if isinstance(args['c_hosts'], basestring): + ch = ast.literal_eval(args['c_hosts']) + else: + ch = args['c_hosts'] + + c = Collector(args['t_port'], args['t_host'], args['m_port'], + args['m_host'], args['c_port'], ch) + + # Collector loop + loop = True + while loop: + try: + c.get_tracing(args['t_services']) + c.get_monitoring() + time.sleep(int(args['sinterval'])) + except KeyboardInterrupt: + loop = False + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument( + '-sinterval', default=5, + help='Sample interval for collector loop') + parser.add_argument( + '-t_port', default=TRACING_PORT, + help='Port to access Jaeger tracing') + parser.add_argument( + '-m_port', default=MONITORING_PORT, + help='Port to access Prometheus monitoring') + parser.add_argument( + '-t_host', default=TRACING_HOST, + help='Host to access Jaeger tracing') + parser.add_argument( + '-m_host', default=MONITORING_HOST, + help='Host to access Prometheus monitoring') + parser.add_argument( + '-c_hosts', default=CASSANDRA_HOSTS, + help='Host(s) to access Cassandra cluster') + parser.add_argument( + '-c_port', default=CASSANDRA_PORT, + help='Port to access Cassandra cluster') + parser.add_argument( + '-t_services', default=TRACING_SERVICES, + help='Collect services on this list of services') + + args, unknown = parser.parse_known_args() + print(main(vars(args))) |