summaryrefslogtreecommitdiffstats
path: root/clover/collector/db/cassops.py
diff options
context:
space:
mode:
authorearrage <eddie.arrage@huawei.com>2018-10-10 11:54:51 -0700
committerEddie Arrage <eddie.arrage@huawei.com>2018-10-12 01:07:26 +0000
commit29234dd20c49fe62734b723f1961c70ac6f1db08 (patch)
tree8640c5b37db1e88c82e828f250ccfcdc04b8f26f /clover/collector/db/cassops.py
parentee2169ee4b8fb3539ad173fbc1557b54b2f2216f (diff)
Improve data ingestion reliability and functionality
- Modify deployment namespace to clover-system and account for cassandra moving to the clover-system namespace - Increase k8s compute resource assigned to cassandra to deal with performance issues - Add additional fields (user-agent, request/response size, status codes) to span schema definition and modify primary keys - Improve exception handling to prevent collect process from crashing - Minor changes to support tracing/monitoring with Istio 1.0 - Inhibit logging for debug messages - Increase time back and number of traces to fetch in each sampling interval to deal with Jaeger REST interface returning trace data out of order under load (tested to 300 conn/sec; 12K connections currently) - Move trace insert into batch mode to cassandra - Read visibility services to analyze from redis rather than defaults (cloverctl, UI or clover-controller REST will set) - Remove local directory copies in docker build, as image is based on base clover container Change-Id: Ibae98ef5057e52a6eeddd9ebbcfaeb644caec36c Signed-off-by: earrage <eddie.arrage@huawei.com>
Diffstat (limited to 'clover/collector/db/cassops.py')
-rw-r--r--clover/collector/db/cassops.py54
1 files changed, 33 insertions, 21 deletions
diff --git a/clover/collector/db/cassops.py b/clover/collector/db/cassops.py
index 6553cff..0bc9d84 100644
--- a/clover/collector/db/cassops.py
+++ b/clover/collector/db/cassops.py
@@ -9,7 +9,7 @@ from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
import logging
-CASSANDRA_HOSTS = ['cassandra.default']
+CASSANDRA_HOSTS = ['cassandra.clover-system']
class CassandraOps:
@@ -57,13 +57,18 @@ class CassandraOps:
spanid text,
traceid text,
duration int,
- start_time int,
+ start_time timestamp,
processid text,
operation_name text,
node_id text,
http_url text,
+ user_agent text,
+ request_size text,
+ response_size text,
+ status_code text,
upstream_cluster text,
- PRIMARY KEY (spanid, traceid)
+ insert_time timestamp,
+ PRIMARY KEY (traceid, spanid)
)
""")
@@ -82,11 +87,18 @@ class CassandraOps:
def set_prepared(self):
self.session.set_keyspace(self.keyspace)
- self.insert_tracing_stmt = self.session.prepare(
+ self.insert_span_stmt = self.session.prepare(
"""
INSERT INTO spans (spanid, traceid, duration, operation_name,
- node_id, http_url, upstream_cluster)
- VALUES (?, ?, ?, ?, ?, ?, ?)
+ node_id, http_url, upstream_cluster, start_time, user_agent,
+ request_size, response_size, status_code, insert_time)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, toTimestamp(now()))
+ """
+ )
+ self.insert_trace_stmt = self.session.prepare(
+ """
+ INSERT INTO traces (traceid, processes)
+ VALUES (?, ?)
"""
)
self.insert_metric_stmt = self.session.prepare(
@@ -103,31 +115,31 @@ class CassandraOps:
def execute_batch(self):
self.session.execute(self.batch)
- def insert_tracing(self, table, traceid, s, tags):
+ def insert_span(self, traceid, s, tags):
self.session.set_keyspace(self.keyspace)
if 'upstream_cluster' not in tags:
- logging.debug('NO UPSTREAM_CLUSTER KEY')
+ # logging.debug('NO UPSTREAM_CLUSTER KEY')
tags['upstream_cluster'] = 'none'
try:
- self.batch.add(self.insert_tracing_stmt,
+ self.batch.add(self.insert_span_stmt,
(s['spanID'], traceid, s['duration'],
s['operationName'], tags['node_id'],
- tags['http.url'], tags['upstream_cluster']))
+ tags['http.url'], tags['upstream_cluster'],
+ int(str(s['startTime'])[0:13]), tags['user_agent'],
+ tags['request_size'], tags['response_size'],
+ tags['http.status_code']))
+ except KeyError as e:
+ logging.debug('Insert span error: {}, Tags: {}'.format(e, tags))
except Exception as e:
- logging.debug('{} {} {} {} {} {} {}'.format(s['spanID'], traceid,
- s['duration'], s['operationName'], tags['node_id'],
- tags['http.url'], tags['upstream_cluster']))
- logging.debug(e)
+ logging.debug('Insert span error: {}'.format(e))
+ logging.debug('Tags: {}'.format(tags))
+ logging.debug('Span toplevel: {}'.format(s))
+ logging.debug(
+ 'startTime: {}'.format(int(str(s['startTime'])[0:13])))
def insert_trace(self, traceid, processes):
self.session.set_keyspace(self.keyspace)
- self.session.execute(
- """
- INSERT INTO traces (traceid, processes)
- VALUES (%s, %s)
- """,
- (traceid, processes)
- )
+ self.batch.add(self.insert_trace_stmt, (traceid, processes))
def insert_metric(self, m_name, m_value, m_time, service):
self.session.set_keyspace(self.keyspace)