From c36f82e76df1f2f506f770935093828d3d573e6b Mon Sep 17 00:00:00 2001 From: earrage Date: Wed, 10 Oct 2018 11:54:51 -0700 Subject: Improve data ingestion reliability and functionality - Modify deployment namespace to clover-system and account for cassandra moving to the clover-system namespace - Increase k8s compute resource assigned to cassandra to deal with performance issues - Add additional fields (user-agent, request/response size, status codes) to span schema definition and modify primary keys - Improve exception handling to prevent collect process from crashing - Minor changes to support tracing/monitoring with Istio 1.0 - Inhibit logging for debug messages - Increase time back and number of traces to fetch in each sampling interval to deal with Jaeger REST interface returning trace data out of order under load (tested to 300 conn/sec; 12K connections currently) - Move trace insert into batch mode to cassandra - Read visibility services to analyze from redis rather than defaults (cloverctl, UI or clover-controller REST will set) - Remove local directory copies in docker build, as image is based on base clover container Change-Id: Ibae98ef5057e52a6eeddd9ebbcfaeb644caec36c Signed-off-by: earrage --- clover/collector/db/cassops.py | 54 ++++++++------ clover/collector/db/redisops.py | 14 ++-- clover/collector/docker/Dockerfile | 9 --- clover/collector/grpc/collector_client.py | 6 +- clover/collector/grpc/collector_server.py | 2 +- clover/collector/process/collect.py | 115 +++++++++++++++++++----------- clover/collector/yaml/manifest.template | 2 + clover/tools/yaml/cassandra.yaml | 12 ++-- 8 files changed, 128 insertions(+), 86 deletions(-) diff --git a/clover/collector/db/cassops.py b/clover/collector/db/cassops.py index 6553cff..0bc9d84 100644 --- a/clover/collector/db/cassops.py +++ b/clover/collector/db/cassops.py @@ -9,7 +9,7 @@ from cassandra.cluster import Cluster from cassandra.query import BatchStatement import logging -CASSANDRA_HOSTS = ['cassandra.default'] +CASSANDRA_HOSTS = ['cassandra.clover-system'] class CassandraOps: @@ -57,13 +57,18 @@ class CassandraOps: spanid text, traceid text, duration int, - start_time int, + start_time timestamp, processid text, operation_name text, node_id text, http_url text, + user_agent text, + request_size text, + response_size text, + status_code text, upstream_cluster text, - PRIMARY KEY (spanid, traceid) + insert_time timestamp, + PRIMARY KEY (traceid, spanid) ) """) @@ -82,11 +87,18 @@ class CassandraOps: def set_prepared(self): self.session.set_keyspace(self.keyspace) - self.insert_tracing_stmt = self.session.prepare( + self.insert_span_stmt = self.session.prepare( """ INSERT INTO spans (spanid, traceid, duration, operation_name, - node_id, http_url, upstream_cluster) - VALUES (?, ?, ?, ?, ?, ?, ?) + node_id, http_url, upstream_cluster, start_time, user_agent, + request_size, response_size, status_code, insert_time) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, toTimestamp(now())) + """ + ) + self.insert_trace_stmt = self.session.prepare( + """ + INSERT INTO traces (traceid, processes) + VALUES (?, ?) """ ) self.insert_metric_stmt = self.session.prepare( @@ -103,31 +115,31 @@ class CassandraOps: def execute_batch(self): self.session.execute(self.batch) - def insert_tracing(self, table, traceid, s, tags): + def insert_span(self, traceid, s, tags): self.session.set_keyspace(self.keyspace) if 'upstream_cluster' not in tags: - logging.debug('NO UPSTREAM_CLUSTER KEY') + # logging.debug('NO UPSTREAM_CLUSTER KEY') tags['upstream_cluster'] = 'none' try: - self.batch.add(self.insert_tracing_stmt, + self.batch.add(self.insert_span_stmt, (s['spanID'], traceid, s['duration'], s['operationName'], tags['node_id'], - tags['http.url'], tags['upstream_cluster'])) + tags['http.url'], tags['upstream_cluster'], + int(str(s['startTime'])[0:13]), tags['user_agent'], + tags['request_size'], tags['response_size'], + tags['http.status_code'])) + except KeyError as e: + logging.debug('Insert span error: {}, Tags: {}'.format(e, tags)) except Exception as e: - logging.debug('{} {} {} {} {} {} {}'.format(s['spanID'], traceid, - s['duration'], s['operationName'], tags['node_id'], - tags['http.url'], tags['upstream_cluster'])) - logging.debug(e) + logging.debug('Insert span error: {}'.format(e)) + logging.debug('Tags: {}'.format(tags)) + logging.debug('Span toplevel: {}'.format(s)) + logging.debug( + 'startTime: {}'.format(int(str(s['startTime'])[0:13]))) def insert_trace(self, traceid, processes): self.session.set_keyspace(self.keyspace) - self.session.execute( - """ - INSERT INTO traces (traceid, processes) - VALUES (%s, %s) - """, - (traceid, processes) - ) + self.batch.add(self.insert_trace_stmt, (traceid, processes)) def insert_metric(self, m_name, m_value, m_time, service): self.session.set_keyspace(self.keyspace) diff --git a/clover/collector/db/redisops.py b/clover/collector/db/redisops.py index e80c417..24fbeb9 100644 --- a/clover/collector/db/redisops.py +++ b/clover/collector/db/redisops.py @@ -8,8 +8,7 @@ import redis import logging -REDIS_HOST = 'redis' -# REDIS_HOST = '10.244.0.85' +REDIS_HOST = 'redis.default' class RedisOps: @@ -27,11 +26,16 @@ class RedisOps: for s in service_names: self.r.sadd(skey, s) + def set_tracing_services(self, services, skey='tracing_services'): + self.r.delete(skey) + for s in services: + self.r.sadd(skey, s) + def init_metrics(self, pkey='metric_prefixes', skey='metric_suffixes'): - metric_prefixes = ['envoy_cluster_out_', 'envoy_cluster_in_'] + metric_prefixes = ['envoy_cluster_outbound_', 'envoy_cluster_inbound_'] metric_suffixes = [ - '_default_svc_cluster_local_http_internal_upstream_rq_2xx', - '_default_svc_cluster_local_http_upstream_cx_active'] + '_default_svc_cluster_local_upstream_rq_2xx', + '_default_svc_cluster_local_upstream_cx_active'] for p in metric_prefixes: self.r.sadd(pkey, p) for s in metric_suffixes: diff --git a/clover/collector/docker/Dockerfile b/clover/collector/docker/Dockerfile index 1714420..7b6effd 100644 --- a/clover/collector/docker/Dockerfile +++ b/clover/collector/docker/Dockerfile @@ -16,15 +16,6 @@ ENV CLOVER_REPO_DIR="${REPOS_DIR}/clover" RUN python -m pip install cassandra-driver redis # Set work directory -WORKDIR ${CLOVER_REPO_DIR} - -COPY /process clover/collector/process -COPY /grpc clover/collector/grpc -COPY /db clover/collector/db -COPY __init__.py clover/collector/__init__.py - -RUN pip install . - WORKDIR "${CLOVER_REPO_DIR}/clover/collector" CMD ./process/grpc_process.sh no_schema_init diff --git a/clover/collector/grpc/collector_client.py b/clover/collector/grpc/collector_client.py index b9e9f67..65ff2ff 100644 --- a/clover/collector/grpc/collector_client.py +++ b/clover/collector/grpc/collector_client.py @@ -55,7 +55,7 @@ def get_podip(pod_name): def init_visibility(stub): try: - cassandra_hosts = pickle.dumps(['cassandra.default']) + cassandra_hosts = pickle.dumps(['cassandra.clover-system']) response = stub.InitVisibility(collector_pb2.ConfigCassandra( cassandra_hosts=cassandra_hosts, cassandra_port=9042)) except Exception as e: @@ -65,7 +65,7 @@ def init_visibility(stub): def clean_visibility(stub): try: - cassandra_hosts = pickle.dumps(['cassandra.default']) + cassandra_hosts = pickle.dumps(['cassandra.clover-system']) schemas = pickle.dumps(['spans', 'traces', 'metrics']) response = stub.TruncateVisibility(collector_pb2.Schemas( schemas=schemas, cassandra_hosts=cassandra_hosts, @@ -77,7 +77,7 @@ def clean_visibility(stub): def start_collector(stub): try: - cassandra_hosts = pickle.dumps(['cassandra.default']) + cassandra_hosts = pickle.dumps(['cassandra.clover-system']) response = stub.StartCollector(collector_pb2.ConfigCollector( t_port='16686', t_host='jaeger-deployment.istio-system', m_port='9090', m_host='prometheus.istio-system', diff --git a/clover/collector/grpc/collector_server.py b/clover/collector/grpc/collector_server.py index c2eb221..a10078e 100644 --- a/clover/collector/grpc/collector_server.py +++ b/clover/collector/grpc/collector_server.py @@ -29,7 +29,7 @@ class Controller(collector_pb2_grpc.ControllerServicer): level=logging.DEBUG) self.collector = 0 if init_visibility == 'set_schemas': - cassandra_hosts = pickle.dumps(['cassandra.default']) + cassandra_hosts = pickle.dumps(['cassandra.clover-system']) self.InitVisibility(collector_pb2.ConfigCassandra( cassandra_port=9042, cassandra_hosts=cassandra_hosts), "") diff --git a/clover/collector/process/collect.py b/clover/collector/process/collect.py index d8beb49..3d9df8a 100644 --- a/clover/collector/process/collect.py +++ b/clover/collector/process/collect.py @@ -16,19 +16,25 @@ import argparse import logging import ast -TRACING_SERVICES = ['istio-ingress'] TRACING_PORT = "16686" MONITORING_PORT = "9090" CASSANDRA_PORT = 9042 # Provide as integer MONITORING_HOST = "prometheus.istio-system" -TRACING_HOST = "jaeger-deployment.istio-system" -CASSANDRA_HOSTS = ['cassandra.default'] +TRACING_HOST = "tracing.istio-system" +CASSANDRA_HOSTS = ['cassandra.clover-system'] class Collector: def __init__(self, t_port, t_host, m_port, m_host, c_port, c_hosts): - logging.basicConfig(filename='collector.log', level=logging.DEBUG) + + # logging.basicConfig(filename='collector.log', level=logging.DEBUG) + logging.basicConfig(filename='collector.log', level=logging.ERROR) + # logging.getLogger("requests").setLevel(logging.DEBUG) + logging.getLogger("requests").setLevel(logging.ERROR) + # logging.getLogger("urllib3").setLevel(logging.DEBUG) + logging.getLogger("urllib3").setLevel(logging.ERROR) + try: self.t = Tracing(t_host, t_port, '', False) monitoring_url = "http://{}:{}".format(m_host, m_port) @@ -40,63 +46,89 @@ class Collector: logging.debug(e) # Toplevel tracing retrieval and batch insert - def get_tracing(self, services, time_back=20): - self.c.set_batch() - for service in services: - traces = self.t.getTraces(service, time_back) - try: - self.set_tracing(traces) - except Exception as e: - logging.debug(e) - self.c.execute_batch() + def get_tracing(self, time_back=300): + try: + services = self.r.get_services() + for service in services: + traces = self.t.getTraces(service.replace("_", "-"), time_back, + '20000') + try: + self.set_tracing(traces) + except Exception as e: + logging.debug(e) + + # Update list of available services from tracing + services = self.t.getServices() + self.r.set_tracing_services(services) + except Exception as e: + logging.debug(e) # Insert to cassandra visibility traces and spans tables def set_tracing(self, trace): for traces in trace['data']: + self.c.set_batch() for spans in traces['spans']: + try: span = {} span['spanID'] = spans['spanID'] span['duration'] = spans['duration'] span['startTime'] = spans['startTime'] span['operationName'] = spans['operationName'] + tag = {} for tags in spans['tags']: tag[tags['key']] = tags['value'] - self.c.insert_tracing('spans', traces['traceID'], - span, tag) + self.c.insert_span(traces['traceID'], span, tag) + except Exception as e: + logging.debug("spans loop") + logging.debug(e) + process_list = [] for p in traces['processes']: process_list.append(p) service_names = [] for pname in process_list: service_names.append(traces['processes'][pname]['serviceName']) - self.c.insert_trace(traces['traceID'], service_names) + try: + self.c.insert_trace(traces['traceID'], service_names) + self.c.execute_batch() + except Exception as e: + logging.debug(e) # Insert to cassandra visibility metrics table def get_monitoring(self): - # Fetch collector service/metric lists from redis - service_names = self.r.get_services() - metric_prefixes, metric_suffixes = self.r.get_metrics() - - self.c.set_batch() - for sname in service_names: - for prefix in metric_prefixes: - for suffix in metric_suffixes: - try: - metric_name = prefix + sname + suffix - query_params = { - "type": "instant", - "query": metric_name - } - data = self.m.query(query_params) - m_value = data['data']['result'][0]['value'][1] - m_time = data['data']['result'][0]['value'][0] - mn = data['data']['result'][0]['metric']['__name__'] - self.c.insert_metric(mn, m_value, str(m_time), sname) - except Exception as e: - logging.debug(e) - self.c.execute_batch() + try: + # Fetch collector service/metric lists from redis + service_names = self.r.get_services() + metric_prefixes, metric_suffixes = self.r.get_metrics() + + self.c.set_batch() + for sname in service_names: + for prefix in metric_prefixes: + for suffix in metric_suffixes: + try: + metric_name = prefix + sname + suffix + query_params = { + "type": "instant", + "query": metric_name + } + data = self.m.query(query_params) + m_value = data['data']['result'][0]['value'][1] + m_time = data['data']['result'][0]['value'][0] + mn = data[ + 'data']['result'][0]['metric']['__name__'] + self.c.insert_metric( + mn, m_value, str(m_time), sname) + + # Add to redis temporarily + self.r.r.set(mn, m_value) + + except Exception as e: + logging.debug(e) + self.c.execute_batch() + except Exception as e: + logging.debug(e) # TODO add batch retrieval for monitoring metrics # query_range_param = { @@ -124,11 +156,13 @@ def main(args): loop = True while loop: try: - c.get_tracing(args['t_services']) + c.get_tracing() c.get_monitoring() time.sleep(int(args['sinterval'])) except KeyboardInterrupt: loop = False + except Exception as e: + logging.debug(e) if __name__ == '__main__': @@ -154,9 +188,6 @@ if __name__ == '__main__': parser.add_argument( '-c_port', default=CASSANDRA_PORT, help='Port to access Cassandra cluster') - parser.add_argument( - '-t_services', default=TRACING_SERVICES, - help='Collect services on this list of services') args, unknown = parser.parse_known_args() print(main(vars(args))) diff --git a/clover/collector/yaml/manifest.template b/clover/collector/yaml/manifest.template index c7aa3e7..795bd8f 100644 --- a/clover/collector/yaml/manifest.template +++ b/clover/collector/yaml/manifest.template @@ -5,6 +5,7 @@ metadata: name: {{ deploy_name }} labels: app: {{ deploy_name }} + namespace: clover-system spec: template: metadata: @@ -27,6 +28,7 @@ metadata: name: {{ deploy_name }} labels: app: {{ deploy_name }} + namespace: clover-system spec: ports: - port: {{ grpc_port }} diff --git a/clover/tools/yaml/cassandra.yaml b/clover/tools/yaml/cassandra.yaml index 0206d75..dc1c46f 100644 --- a/clover/tools/yaml/cassandra.yaml +++ b/clover/tools/yaml/cassandra.yaml @@ -36,6 +36,7 @@ metadata: labels: app: cassandra name: cassandra + namespace: clover-system spec: clusterIP: None ports: @@ -49,6 +50,7 @@ metadata: name: cassandra labels: app: cassandra + namespace: clover-system spec: serviceName: cassandra replicas: 1 @@ -76,18 +78,18 @@ spec: name: cql resources: limits: - cpu: "500m" - memory: 1Gi + cpu: "1000m" + memory: 5Gi requests: - cpu: "500m" - memory: 1Gi + cpu: "1000m" + memory: 5Gi env: - name: MAX_HEAP_SIZE value: 512M - name: HEAP_NEWSIZE value: 100M - name: CASSANDRA_SEEDS - value: "cassandra-0.cassandra.default.svc.cluster.local" + value: "cassandra-0.cassandra.clover-system.svc.cluster.local" - name: CASSANDRA_CLUSTER_NAME value: "MyCassandraDemo" - name: CASSANDRA_DC -- cgit 1.2.3-korg