diff options
author | Stephen Wong <stephen.kf.wong@gmail.com> | 2018-10-21 06:07:07 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@opnfv.org> | 2018-10-21 06:07:07 +0000 |
commit | b713d2f3fee2d8164bf16c7249d75c3da827005b (patch) | |
tree | 35ccdd08739634e08d749606f433541583a9d290 | |
parent | 72f00b63cf5cc1a53343cafcdd2b26be31d5ed9e (diff) | |
parent | c36f82e76df1f2f506f770935093828d3d573e6b (diff) |
Merge "Improve data ingestion reliability and functionality"
-rw-r--r-- | clover/collector/db/cassops.py | 54 | ||||
-rw-r--r-- | clover/collector/db/redisops.py | 14 | ||||
-rw-r--r-- | clover/collector/docker/Dockerfile | 9 | ||||
-rw-r--r-- | clover/collector/grpc/collector_client.py | 6 | ||||
-rw-r--r-- | clover/collector/grpc/collector_server.py | 2 | ||||
-rw-r--r-- | clover/collector/process/collect.py | 115 | ||||
-rw-r--r-- | clover/collector/yaml/manifest.template | 2 | ||||
-rw-r--r-- | clover/tools/yaml/cassandra.yaml | 12 |
8 files changed, 128 insertions, 86 deletions
diff --git a/clover/collector/db/cassops.py b/clover/collector/db/cassops.py index 6553cff..0bc9d84 100644 --- a/clover/collector/db/cassops.py +++ b/clover/collector/db/cassops.py @@ -9,7 +9,7 @@ from cassandra.cluster import Cluster from cassandra.query import BatchStatement import logging -CASSANDRA_HOSTS = ['cassandra.default'] +CASSANDRA_HOSTS = ['cassandra.clover-system'] class CassandraOps: @@ -57,13 +57,18 @@ class CassandraOps: spanid text, traceid text, duration int, - start_time int, + start_time timestamp, processid text, operation_name text, node_id text, http_url text, + user_agent text, + request_size text, + response_size text, + status_code text, upstream_cluster text, - PRIMARY KEY (spanid, traceid) + insert_time timestamp, + PRIMARY KEY (traceid, spanid) ) """) @@ -82,11 +87,18 @@ class CassandraOps: def set_prepared(self): self.session.set_keyspace(self.keyspace) - self.insert_tracing_stmt = self.session.prepare( + self.insert_span_stmt = self.session.prepare( """ INSERT INTO spans (spanid, traceid, duration, operation_name, - node_id, http_url, upstream_cluster) - VALUES (?, ?, ?, ?, ?, ?, ?) + node_id, http_url, upstream_cluster, start_time, user_agent, + request_size, response_size, status_code, insert_time) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, toTimestamp(now())) + """ + ) + self.insert_trace_stmt = self.session.prepare( + """ + INSERT INTO traces (traceid, processes) + VALUES (?, ?) """ ) self.insert_metric_stmt = self.session.prepare( @@ -103,31 +115,31 @@ class CassandraOps: def execute_batch(self): self.session.execute(self.batch) - def insert_tracing(self, table, traceid, s, tags): + def insert_span(self, traceid, s, tags): self.session.set_keyspace(self.keyspace) if 'upstream_cluster' not in tags: - logging.debug('NO UPSTREAM_CLUSTER KEY') + # logging.debug('NO UPSTREAM_CLUSTER KEY') tags['upstream_cluster'] = 'none' try: - self.batch.add(self.insert_tracing_stmt, + self.batch.add(self.insert_span_stmt, (s['spanID'], traceid, s['duration'], s['operationName'], tags['node_id'], - tags['http.url'], tags['upstream_cluster'])) + tags['http.url'], tags['upstream_cluster'], + int(str(s['startTime'])[0:13]), tags['user_agent'], + tags['request_size'], tags['response_size'], + tags['http.status_code'])) + except KeyError as e: + logging.debug('Insert span error: {}, Tags: {}'.format(e, tags)) except Exception as e: - logging.debug('{} {} {} {} {} {} {}'.format(s['spanID'], traceid, - s['duration'], s['operationName'], tags['node_id'], - tags['http.url'], tags['upstream_cluster'])) - logging.debug(e) + logging.debug('Insert span error: {}'.format(e)) + logging.debug('Tags: {}'.format(tags)) + logging.debug('Span toplevel: {}'.format(s)) + logging.debug( + 'startTime: {}'.format(int(str(s['startTime'])[0:13]))) def insert_trace(self, traceid, processes): self.session.set_keyspace(self.keyspace) - self.session.execute( - """ - INSERT INTO traces (traceid, processes) - VALUES (%s, %s) - """, - (traceid, processes) - ) + self.batch.add(self.insert_trace_stmt, (traceid, processes)) def insert_metric(self, m_name, m_value, m_time, service): self.session.set_keyspace(self.keyspace) diff --git a/clover/collector/db/redisops.py b/clover/collector/db/redisops.py index e80c417..24fbeb9 100644 --- a/clover/collector/db/redisops.py +++ b/clover/collector/db/redisops.py @@ -8,8 +8,7 @@ import redis import logging -REDIS_HOST = 'redis' -# REDIS_HOST = '10.244.0.85' +REDIS_HOST = 'redis.default' class RedisOps: @@ -27,11 +26,16 @@ class RedisOps: for s in service_names: self.r.sadd(skey, s) + def set_tracing_services(self, services, skey='tracing_services'): + self.r.delete(skey) + for s in services: + self.r.sadd(skey, s) + def init_metrics(self, pkey='metric_prefixes', skey='metric_suffixes'): - metric_prefixes = ['envoy_cluster_out_', 'envoy_cluster_in_'] + metric_prefixes = ['envoy_cluster_outbound_', 'envoy_cluster_inbound_'] metric_suffixes = [ - '_default_svc_cluster_local_http_internal_upstream_rq_2xx', - '_default_svc_cluster_local_http_upstream_cx_active'] + '_default_svc_cluster_local_upstream_rq_2xx', + '_default_svc_cluster_local_upstream_cx_active'] for p in metric_prefixes: self.r.sadd(pkey, p) for s in metric_suffixes: diff --git a/clover/collector/docker/Dockerfile b/clover/collector/docker/Dockerfile index 1714420..7b6effd 100644 --- a/clover/collector/docker/Dockerfile +++ b/clover/collector/docker/Dockerfile @@ -16,15 +16,6 @@ ENV CLOVER_REPO_DIR="${REPOS_DIR}/clover" RUN python -m pip install cassandra-driver redis # Set work directory -WORKDIR ${CLOVER_REPO_DIR} - -COPY /process clover/collector/process -COPY /grpc clover/collector/grpc -COPY /db clover/collector/db -COPY __init__.py clover/collector/__init__.py - -RUN pip install . - WORKDIR "${CLOVER_REPO_DIR}/clover/collector" CMD ./process/grpc_process.sh no_schema_init diff --git a/clover/collector/grpc/collector_client.py b/clover/collector/grpc/collector_client.py index b9e9f67..65ff2ff 100644 --- a/clover/collector/grpc/collector_client.py +++ b/clover/collector/grpc/collector_client.py @@ -55,7 +55,7 @@ def get_podip(pod_name): def init_visibility(stub): try: - cassandra_hosts = pickle.dumps(['cassandra.default']) + cassandra_hosts = pickle.dumps(['cassandra.clover-system']) response = stub.InitVisibility(collector_pb2.ConfigCassandra( cassandra_hosts=cassandra_hosts, cassandra_port=9042)) except Exception as e: @@ -65,7 +65,7 @@ def init_visibility(stub): def clean_visibility(stub): try: - cassandra_hosts = pickle.dumps(['cassandra.default']) + cassandra_hosts = pickle.dumps(['cassandra.clover-system']) schemas = pickle.dumps(['spans', 'traces', 'metrics']) response = stub.TruncateVisibility(collector_pb2.Schemas( schemas=schemas, cassandra_hosts=cassandra_hosts, @@ -77,7 +77,7 @@ def clean_visibility(stub): def start_collector(stub): try: - cassandra_hosts = pickle.dumps(['cassandra.default']) + cassandra_hosts = pickle.dumps(['cassandra.clover-system']) response = stub.StartCollector(collector_pb2.ConfigCollector( t_port='16686', t_host='jaeger-deployment.istio-system', m_port='9090', m_host='prometheus.istio-system', diff --git a/clover/collector/grpc/collector_server.py b/clover/collector/grpc/collector_server.py index c2eb221..a10078e 100644 --- a/clover/collector/grpc/collector_server.py +++ b/clover/collector/grpc/collector_server.py @@ -29,7 +29,7 @@ class Controller(collector_pb2_grpc.ControllerServicer): level=logging.DEBUG) self.collector = 0 if init_visibility == 'set_schemas': - cassandra_hosts = pickle.dumps(['cassandra.default']) + cassandra_hosts = pickle.dumps(['cassandra.clover-system']) self.InitVisibility(collector_pb2.ConfigCassandra( cassandra_port=9042, cassandra_hosts=cassandra_hosts), "") diff --git a/clover/collector/process/collect.py b/clover/collector/process/collect.py index d8beb49..3d9df8a 100644 --- a/clover/collector/process/collect.py +++ b/clover/collector/process/collect.py @@ -16,19 +16,25 @@ import argparse import logging import ast -TRACING_SERVICES = ['istio-ingress'] TRACING_PORT = "16686" MONITORING_PORT = "9090" CASSANDRA_PORT = 9042 # Provide as integer MONITORING_HOST = "prometheus.istio-system" -TRACING_HOST = "jaeger-deployment.istio-system" -CASSANDRA_HOSTS = ['cassandra.default'] +TRACING_HOST = "tracing.istio-system" +CASSANDRA_HOSTS = ['cassandra.clover-system'] class Collector: def __init__(self, t_port, t_host, m_port, m_host, c_port, c_hosts): - logging.basicConfig(filename='collector.log', level=logging.DEBUG) + + # logging.basicConfig(filename='collector.log', level=logging.DEBUG) + logging.basicConfig(filename='collector.log', level=logging.ERROR) + # logging.getLogger("requests").setLevel(logging.DEBUG) + logging.getLogger("requests").setLevel(logging.ERROR) + # logging.getLogger("urllib3").setLevel(logging.DEBUG) + logging.getLogger("urllib3").setLevel(logging.ERROR) + try: self.t = Tracing(t_host, t_port, '', False) monitoring_url = "http://{}:{}".format(m_host, m_port) @@ -40,63 +46,89 @@ class Collector: logging.debug(e) # Toplevel tracing retrieval and batch insert - def get_tracing(self, services, time_back=20): - self.c.set_batch() - for service in services: - traces = self.t.getTraces(service, time_back) - try: - self.set_tracing(traces) - except Exception as e: - logging.debug(e) - self.c.execute_batch() + def get_tracing(self, time_back=300): + try: + services = self.r.get_services() + for service in services: + traces = self.t.getTraces(service.replace("_", "-"), time_back, + '20000') + try: + self.set_tracing(traces) + except Exception as e: + logging.debug(e) + + # Update list of available services from tracing + services = self.t.getServices() + self.r.set_tracing_services(services) + except Exception as e: + logging.debug(e) # Insert to cassandra visibility traces and spans tables def set_tracing(self, trace): for traces in trace['data']: + self.c.set_batch() for spans in traces['spans']: + try: span = {} span['spanID'] = spans['spanID'] span['duration'] = spans['duration'] span['startTime'] = spans['startTime'] span['operationName'] = spans['operationName'] + tag = {} for tags in spans['tags']: tag[tags['key']] = tags['value'] - self.c.insert_tracing('spans', traces['traceID'], - span, tag) + self.c.insert_span(traces['traceID'], span, tag) + except Exception as e: + logging.debug("spans loop") + logging.debug(e) + process_list = [] for p in traces['processes']: process_list.append(p) service_names = [] for pname in process_list: service_names.append(traces['processes'][pname]['serviceName']) - self.c.insert_trace(traces['traceID'], service_names) + try: + self.c.insert_trace(traces['traceID'], service_names) + self.c.execute_batch() + except Exception as e: + logging.debug(e) # Insert to cassandra visibility metrics table def get_monitoring(self): - # Fetch collector service/metric lists from redis - service_names = self.r.get_services() - metric_prefixes, metric_suffixes = self.r.get_metrics() - - self.c.set_batch() - for sname in service_names: - for prefix in metric_prefixes: - for suffix in metric_suffixes: - try: - metric_name = prefix + sname + suffix - query_params = { - "type": "instant", - "query": metric_name - } - data = self.m.query(query_params) - m_value = data['data']['result'][0]['value'][1] - m_time = data['data']['result'][0]['value'][0] - mn = data['data']['result'][0]['metric']['__name__'] - self.c.insert_metric(mn, m_value, str(m_time), sname) - except Exception as e: - logging.debug(e) - self.c.execute_batch() + try: + # Fetch collector service/metric lists from redis + service_names = self.r.get_services() + metric_prefixes, metric_suffixes = self.r.get_metrics() + + self.c.set_batch() + for sname in service_names: + for prefix in metric_prefixes: + for suffix in metric_suffixes: + try: + metric_name = prefix + sname + suffix + query_params = { + "type": "instant", + "query": metric_name + } + data = self.m.query(query_params) + m_value = data['data']['result'][0]['value'][1] + m_time = data['data']['result'][0]['value'][0] + mn = data[ + 'data']['result'][0]['metric']['__name__'] + self.c.insert_metric( + mn, m_value, str(m_time), sname) + + # Add to redis temporarily + self.r.r.set(mn, m_value) + + except Exception as e: + logging.debug(e) + self.c.execute_batch() + except Exception as e: + logging.debug(e) # TODO add batch retrieval for monitoring metrics # query_range_param = { @@ -124,11 +156,13 @@ def main(args): loop = True while loop: try: - c.get_tracing(args['t_services']) + c.get_tracing() c.get_monitoring() time.sleep(int(args['sinterval'])) except KeyboardInterrupt: loop = False + except Exception as e: + logging.debug(e) if __name__ == '__main__': @@ -154,9 +188,6 @@ if __name__ == '__main__': parser.add_argument( '-c_port', default=CASSANDRA_PORT, help='Port to access Cassandra cluster') - parser.add_argument( - '-t_services', default=TRACING_SERVICES, - help='Collect services on this list of services') args, unknown = parser.parse_known_args() print(main(vars(args))) diff --git a/clover/collector/yaml/manifest.template b/clover/collector/yaml/manifest.template index c7aa3e7..795bd8f 100644 --- a/clover/collector/yaml/manifest.template +++ b/clover/collector/yaml/manifest.template @@ -5,6 +5,7 @@ metadata: name: {{ deploy_name }} labels: app: {{ deploy_name }} + namespace: clover-system spec: template: metadata: @@ -27,6 +28,7 @@ metadata: name: {{ deploy_name }} labels: app: {{ deploy_name }} + namespace: clover-system spec: ports: - port: {{ grpc_port }} diff --git a/clover/tools/yaml/cassandra.yaml b/clover/tools/yaml/cassandra.yaml index 0206d75..dc1c46f 100644 --- a/clover/tools/yaml/cassandra.yaml +++ b/clover/tools/yaml/cassandra.yaml @@ -36,6 +36,7 @@ metadata: labels: app: cassandra name: cassandra + namespace: clover-system spec: clusterIP: None ports: @@ -49,6 +50,7 @@ metadata: name: cassandra labels: app: cassandra + namespace: clover-system spec: serviceName: cassandra replicas: 1 @@ -76,18 +78,18 @@ spec: name: cql resources: limits: - cpu: "500m" - memory: 1Gi + cpu: "1000m" + memory: 5Gi requests: - cpu: "500m" - memory: 1Gi + cpu: "1000m" + memory: 5Gi env: - name: MAX_HEAP_SIZE value: 512M - name: HEAP_NEWSIZE value: 100M - name: CASSANDRA_SEEDS - value: "cassandra-0.cassandra.default.svc.cluster.local" + value: "cassandra-0.cassandra.clover-system.svc.cluster.local" - name: CASSANDRA_CLUSTER_NAME value: "MyCassandraDemo" - name: CASSANDRA_DC |