diff options
author | earrage <eddie.arrage@huawei.com> | 2018-10-10 11:54:51 -0700 |
---|---|---|
committer | Eddie Arrage <eddie.arrage@huawei.com> | 2018-10-12 01:07:26 +0000 |
commit | 29234dd20c49fe62734b723f1961c70ac6f1db08 (patch) | |
tree | 8640c5b37db1e88c82e828f250ccfcdc04b8f26f /clover/collector/db/cassops.py | |
parent | ee2169ee4b8fb3539ad173fbc1557b54b2f2216f (diff) |
Improve data ingestion reliability and functionality
- Modify deployment namespace to clover-system and account
for cassandra moving to the clover-system namespace
- Increase k8s compute resource assigned to cassandra to deal
with performance issues
- Add additional fields (user-agent, request/response size,
status codes) to span schema definition and modify primary keys
- Improve exception handling to prevent collect process from
crashing
- Minor changes to support tracing/monitoring with Istio 1.0
- Inhibit logging for debug messages
- Increase time back and number of traces to fetch in
each sampling interval to deal with Jaeger REST interface
returning trace data out of order under load
(tested to 300 conn/sec; 12K connections currently)
- Move trace insert into batch mode to cassandra
- Read visibility services to analyze from redis rather than
defaults (cloverctl, UI or clover-controller REST will set)
- Remove local directory copies in docker build, as image is
based on base clover container
Change-Id: Ibae98ef5057e52a6eeddd9ebbcfaeb644caec36c
Signed-off-by: earrage <eddie.arrage@huawei.com>
Diffstat (limited to 'clover/collector/db/cassops.py')
-rw-r--r-- | clover/collector/db/cassops.py | 54 |
1 files changed, 33 insertions, 21 deletions
diff --git a/clover/collector/db/cassops.py b/clover/collector/db/cassops.py index 6553cff..0bc9d84 100644 --- a/clover/collector/db/cassops.py +++ b/clover/collector/db/cassops.py @@ -9,7 +9,7 @@ from cassandra.cluster import Cluster from cassandra.query import BatchStatement import logging -CASSANDRA_HOSTS = ['cassandra.default'] +CASSANDRA_HOSTS = ['cassandra.clover-system'] class CassandraOps: @@ -57,13 +57,18 @@ class CassandraOps: spanid text, traceid text, duration int, - start_time int, + start_time timestamp, processid text, operation_name text, node_id text, http_url text, + user_agent text, + request_size text, + response_size text, + status_code text, upstream_cluster text, - PRIMARY KEY (spanid, traceid) + insert_time timestamp, + PRIMARY KEY (traceid, spanid) ) """) @@ -82,11 +87,18 @@ class CassandraOps: def set_prepared(self): self.session.set_keyspace(self.keyspace) - self.insert_tracing_stmt = self.session.prepare( + self.insert_span_stmt = self.session.prepare( """ INSERT INTO spans (spanid, traceid, duration, operation_name, - node_id, http_url, upstream_cluster) - VALUES (?, ?, ?, ?, ?, ?, ?) + node_id, http_url, upstream_cluster, start_time, user_agent, + request_size, response_size, status_code, insert_time) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, toTimestamp(now())) + """ + ) + self.insert_trace_stmt = self.session.prepare( + """ + INSERT INTO traces (traceid, processes) + VALUES (?, ?) """ ) self.insert_metric_stmt = self.session.prepare( @@ -103,31 +115,31 @@ class CassandraOps: def execute_batch(self): self.session.execute(self.batch) - def insert_tracing(self, table, traceid, s, tags): + def insert_span(self, traceid, s, tags): self.session.set_keyspace(self.keyspace) if 'upstream_cluster' not in tags: - logging.debug('NO UPSTREAM_CLUSTER KEY') + # logging.debug('NO UPSTREAM_CLUSTER KEY') tags['upstream_cluster'] = 'none' try: - self.batch.add(self.insert_tracing_stmt, + self.batch.add(self.insert_span_stmt, (s['spanID'], traceid, s['duration'], s['operationName'], tags['node_id'], - tags['http.url'], tags['upstream_cluster'])) + tags['http.url'], tags['upstream_cluster'], + int(str(s['startTime'])[0:13]), tags['user_agent'], + tags['request_size'], tags['response_size'], + tags['http.status_code'])) + except KeyError as e: + logging.debug('Insert span error: {}, Tags: {}'.format(e, tags)) except Exception as e: - logging.debug('{} {} {} {} {} {} {}'.format(s['spanID'], traceid, - s['duration'], s['operationName'], tags['node_id'], - tags['http.url'], tags['upstream_cluster'])) - logging.debug(e) + logging.debug('Insert span error: {}'.format(e)) + logging.debug('Tags: {}'.format(tags)) + logging.debug('Span toplevel: {}'.format(s)) + logging.debug( + 'startTime: {}'.format(int(str(s['startTime'])[0:13]))) def insert_trace(self, traceid, processes): self.session.set_keyspace(self.keyspace) - self.session.execute( - """ - INSERT INTO traces (traceid, processes) - VALUES (%s, %s) - """, - (traceid, processes) - ) + self.batch.add(self.insert_trace_stmt, (traceid, processes)) def insert_metric(self, m_name, m_value, m_time, service): self.session.set_keyspace(self.keyspace) |