diff options
Diffstat (limited to 'clover/collector/db/cassops.py')
-rw-r--r-- | clover/collector/db/cassops.py | 54 |
1 files changed, 33 insertions, 21 deletions
diff --git a/clover/collector/db/cassops.py b/clover/collector/db/cassops.py index 6553cff..0bc9d84 100644 --- a/clover/collector/db/cassops.py +++ b/clover/collector/db/cassops.py @@ -9,7 +9,7 @@ from cassandra.cluster import Cluster from cassandra.query import BatchStatement import logging -CASSANDRA_HOSTS = ['cassandra.default'] +CASSANDRA_HOSTS = ['cassandra.clover-system'] class CassandraOps: @@ -57,13 +57,18 @@ class CassandraOps: spanid text, traceid text, duration int, - start_time int, + start_time timestamp, processid text, operation_name text, node_id text, http_url text, + user_agent text, + request_size text, + response_size text, + status_code text, upstream_cluster text, - PRIMARY KEY (spanid, traceid) + insert_time timestamp, + PRIMARY KEY (traceid, spanid) ) """) @@ -82,11 +87,18 @@ class CassandraOps: def set_prepared(self): self.session.set_keyspace(self.keyspace) - self.insert_tracing_stmt = self.session.prepare( + self.insert_span_stmt = self.session.prepare( """ INSERT INTO spans (spanid, traceid, duration, operation_name, - node_id, http_url, upstream_cluster) - VALUES (?, ?, ?, ?, ?, ?, ?) + node_id, http_url, upstream_cluster, start_time, user_agent, + request_size, response_size, status_code, insert_time) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, toTimestamp(now())) + """ + ) + self.insert_trace_stmt = self.session.prepare( + """ + INSERT INTO traces (traceid, processes) + VALUES (?, ?) """ ) self.insert_metric_stmt = self.session.prepare( @@ -103,31 +115,31 @@ class CassandraOps: def execute_batch(self): self.session.execute(self.batch) - def insert_tracing(self, table, traceid, s, tags): + def insert_span(self, traceid, s, tags): self.session.set_keyspace(self.keyspace) if 'upstream_cluster' not in tags: - logging.debug('NO UPSTREAM_CLUSTER KEY') + # logging.debug('NO UPSTREAM_CLUSTER KEY') tags['upstream_cluster'] = 'none' try: - self.batch.add(self.insert_tracing_stmt, + self.batch.add(self.insert_span_stmt, (s['spanID'], traceid, s['duration'], s['operationName'], tags['node_id'], - tags['http.url'], tags['upstream_cluster'])) + tags['http.url'], tags['upstream_cluster'], + int(str(s['startTime'])[0:13]), tags['user_agent'], + tags['request_size'], tags['response_size'], + tags['http.status_code'])) + except KeyError as e: + logging.debug('Insert span error: {}, Tags: {}'.format(e, tags)) except Exception as e: - logging.debug('{} {} {} {} {} {} {}'.format(s['spanID'], traceid, - s['duration'], s['operationName'], tags['node_id'], - tags['http.url'], tags['upstream_cluster'])) - logging.debug(e) + logging.debug('Insert span error: {}'.format(e)) + logging.debug('Tags: {}'.format(tags)) + logging.debug('Span toplevel: {}'.format(s)) + logging.debug( + 'startTime: {}'.format(int(str(s['startTime'])[0:13]))) def insert_trace(self, traceid, processes): self.session.set_keyspace(self.keyspace) - self.session.execute( - """ - INSERT INTO traces (traceid, processes) - VALUES (%s, %s) - """, - (traceid, processes) - ) + self.batch.add(self.insert_trace_stmt, (traceid, processes)) def insert_metric(self, m_name, m_value, m_time, service): self.session.set_keyspace(self.keyspace) |