summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorearrage <eddie.arrage@huawei.com>2018-10-10 11:54:51 -0700
committerEddie Arrage <eddie.arrage@huawei.com>2018-10-12 01:07:26 +0000
commit29234dd20c49fe62734b723f1961c70ac6f1db08 (patch)
tree8640c5b37db1e88c82e828f250ccfcdc04b8f26f
parentee2169ee4b8fb3539ad173fbc1557b54b2f2216f (diff)
Improve data ingestion reliability and functionality
- Modify deployment namespace to clover-system and account for cassandra moving to the clover-system namespace - Increase k8s compute resource assigned to cassandra to deal with performance issues - Add additional fields (user-agent, request/response size, status codes) to span schema definition and modify primary keys - Improve exception handling to prevent collect process from crashing - Minor changes to support tracing/monitoring with Istio 1.0 - Inhibit logging for debug messages - Increase time back and number of traces to fetch in each sampling interval to deal with Jaeger REST interface returning trace data out of order under load (tested to 300 conn/sec; 12K connections currently) - Move trace insert into batch mode to cassandra - Read visibility services to analyze from redis rather than defaults (cloverctl, UI or clover-controller REST will set) - Remove local directory copies in docker build, as image is based on base clover container Change-Id: Ibae98ef5057e52a6eeddd9ebbcfaeb644caec36c Signed-off-by: earrage <eddie.arrage@huawei.com>
-rw-r--r--clover/collector/db/cassops.py54
-rw-r--r--clover/collector/db/redisops.py14
-rw-r--r--clover/collector/docker/Dockerfile9
-rw-r--r--clover/collector/grpc/collector_client.py6
-rw-r--r--clover/collector/grpc/collector_server.py2
-rw-r--r--clover/collector/process/collect.py115
-rw-r--r--clover/collector/yaml/manifest.template2
-rw-r--r--clover/tools/yaml/cassandra.yaml12
8 files changed, 128 insertions, 86 deletions
diff --git a/clover/collector/db/cassops.py b/clover/collector/db/cassops.py
index 6553cff..0bc9d84 100644
--- a/clover/collector/db/cassops.py
+++ b/clover/collector/db/cassops.py
@@ -9,7 +9,7 @@ from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
import logging
-CASSANDRA_HOSTS = ['cassandra.default']
+CASSANDRA_HOSTS = ['cassandra.clover-system']
class CassandraOps:
@@ -57,13 +57,18 @@ class CassandraOps:
spanid text,
traceid text,
duration int,
- start_time int,
+ start_time timestamp,
processid text,
operation_name text,
node_id text,
http_url text,
+ user_agent text,
+ request_size text,
+ response_size text,
+ status_code text,
upstream_cluster text,
- PRIMARY KEY (spanid, traceid)
+ insert_time timestamp,
+ PRIMARY KEY (traceid, spanid)
)
""")
@@ -82,11 +87,18 @@ class CassandraOps:
def set_prepared(self):
self.session.set_keyspace(self.keyspace)
- self.insert_tracing_stmt = self.session.prepare(
+ self.insert_span_stmt = self.session.prepare(
"""
INSERT INTO spans (spanid, traceid, duration, operation_name,
- node_id, http_url, upstream_cluster)
- VALUES (?, ?, ?, ?, ?, ?, ?)
+ node_id, http_url, upstream_cluster, start_time, user_agent,
+ request_size, response_size, status_code, insert_time)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, toTimestamp(now()))
+ """
+ )
+ self.insert_trace_stmt = self.session.prepare(
+ """
+ INSERT INTO traces (traceid, processes)
+ VALUES (?, ?)
"""
)
self.insert_metric_stmt = self.session.prepare(
@@ -103,31 +115,31 @@ class CassandraOps:
def execute_batch(self):
self.session.execute(self.batch)
- def insert_tracing(self, table, traceid, s, tags):
+ def insert_span(self, traceid, s, tags):
self.session.set_keyspace(self.keyspace)
if 'upstream_cluster' not in tags:
- logging.debug('NO UPSTREAM_CLUSTER KEY')
+ # logging.debug('NO UPSTREAM_CLUSTER KEY')
tags['upstream_cluster'] = 'none'
try:
- self.batch.add(self.insert_tracing_stmt,
+ self.batch.add(self.insert_span_stmt,
(s['spanID'], traceid, s['duration'],
s['operationName'], tags['node_id'],
- tags['http.url'], tags['upstream_cluster']))
+ tags['http.url'], tags['upstream_cluster'],
+ int(str(s['startTime'])[0:13]), tags['user_agent'],
+ tags['request_size'], tags['response_size'],
+ tags['http.status_code']))
+ except KeyError as e:
+ logging.debug('Insert span error: {}, Tags: {}'.format(e, tags))
except Exception as e:
- logging.debug('{} {} {} {} {} {} {}'.format(s['spanID'], traceid,
- s['duration'], s['operationName'], tags['node_id'],
- tags['http.url'], tags['upstream_cluster']))
- logging.debug(e)
+ logging.debug('Insert span error: {}'.format(e))
+ logging.debug('Tags: {}'.format(tags))
+ logging.debug('Span toplevel: {}'.format(s))
+ logging.debug(
+ 'startTime: {}'.format(int(str(s['startTime'])[0:13])))
def insert_trace(self, traceid, processes):
self.session.set_keyspace(self.keyspace)
- self.session.execute(
- """
- INSERT INTO traces (traceid, processes)
- VALUES (%s, %s)
- """,
- (traceid, processes)
- )
+ self.batch.add(self.insert_trace_stmt, (traceid, processes))
def insert_metric(self, m_name, m_value, m_time, service):
self.session.set_keyspace(self.keyspace)
diff --git a/clover/collector/db/redisops.py b/clover/collector/db/redisops.py
index e80c417..24fbeb9 100644
--- a/clover/collector/db/redisops.py
+++ b/clover/collector/db/redisops.py
@@ -8,8 +8,7 @@
import redis
import logging
-REDIS_HOST = 'redis'
-# REDIS_HOST = '10.244.0.85'
+REDIS_HOST = 'redis.default'
class RedisOps:
@@ -27,11 +26,16 @@ class RedisOps:
for s in service_names:
self.r.sadd(skey, s)
+ def set_tracing_services(self, services, skey='tracing_services'):
+ self.r.delete(skey)
+ for s in services:
+ self.r.sadd(skey, s)
+
def init_metrics(self, pkey='metric_prefixes', skey='metric_suffixes'):
- metric_prefixes = ['envoy_cluster_out_', 'envoy_cluster_in_']
+ metric_prefixes = ['envoy_cluster_outbound_', 'envoy_cluster_inbound_']
metric_suffixes = [
- '_default_svc_cluster_local_http_internal_upstream_rq_2xx',
- '_default_svc_cluster_local_http_upstream_cx_active']
+ '_default_svc_cluster_local_upstream_rq_2xx',
+ '_default_svc_cluster_local_upstream_cx_active']
for p in metric_prefixes:
self.r.sadd(pkey, p)
for s in metric_suffixes:
diff --git a/clover/collector/docker/Dockerfile b/clover/collector/docker/Dockerfile
index 1714420..7b6effd 100644
--- a/clover/collector/docker/Dockerfile
+++ b/clover/collector/docker/Dockerfile
@@ -16,15 +16,6 @@ ENV CLOVER_REPO_DIR="${REPOS_DIR}/clover"
RUN python -m pip install cassandra-driver redis
# Set work directory
-WORKDIR ${CLOVER_REPO_DIR}
-
-COPY /process clover/collector/process
-COPY /grpc clover/collector/grpc
-COPY /db clover/collector/db
-COPY __init__.py clover/collector/__init__.py
-
-RUN pip install .
-
WORKDIR "${CLOVER_REPO_DIR}/clover/collector"
CMD ./process/grpc_process.sh no_schema_init
diff --git a/clover/collector/grpc/collector_client.py b/clover/collector/grpc/collector_client.py
index b9e9f67..65ff2ff 100644
--- a/clover/collector/grpc/collector_client.py
+++ b/clover/collector/grpc/collector_client.py
@@ -55,7 +55,7 @@ def get_podip(pod_name):
def init_visibility(stub):
try:
- cassandra_hosts = pickle.dumps(['cassandra.default'])
+ cassandra_hosts = pickle.dumps(['cassandra.clover-system'])
response = stub.InitVisibility(collector_pb2.ConfigCassandra(
cassandra_hosts=cassandra_hosts, cassandra_port=9042))
except Exception as e:
@@ -65,7 +65,7 @@ def init_visibility(stub):
def clean_visibility(stub):
try:
- cassandra_hosts = pickle.dumps(['cassandra.default'])
+ cassandra_hosts = pickle.dumps(['cassandra.clover-system'])
schemas = pickle.dumps(['spans', 'traces', 'metrics'])
response = stub.TruncateVisibility(collector_pb2.Schemas(
schemas=schemas, cassandra_hosts=cassandra_hosts,
@@ -77,7 +77,7 @@ def clean_visibility(stub):
def start_collector(stub):
try:
- cassandra_hosts = pickle.dumps(['cassandra.default'])
+ cassandra_hosts = pickle.dumps(['cassandra.clover-system'])
response = stub.StartCollector(collector_pb2.ConfigCollector(
t_port='16686', t_host='jaeger-deployment.istio-system',
m_port='9090', m_host='prometheus.istio-system',
diff --git a/clover/collector/grpc/collector_server.py b/clover/collector/grpc/collector_server.py
index c2eb221..a10078e 100644
--- a/clover/collector/grpc/collector_server.py
+++ b/clover/collector/grpc/collector_server.py
@@ -29,7 +29,7 @@ class Controller(collector_pb2_grpc.ControllerServicer):
level=logging.DEBUG)
self.collector = 0
if init_visibility == 'set_schemas':
- cassandra_hosts = pickle.dumps(['cassandra.default'])
+ cassandra_hosts = pickle.dumps(['cassandra.clover-system'])
self.InitVisibility(collector_pb2.ConfigCassandra(
cassandra_port=9042, cassandra_hosts=cassandra_hosts), "")
diff --git a/clover/collector/process/collect.py b/clover/collector/process/collect.py
index d8beb49..3d9df8a 100644
--- a/clover/collector/process/collect.py
+++ b/clover/collector/process/collect.py
@@ -16,19 +16,25 @@ import argparse
import logging
import ast
-TRACING_SERVICES = ['istio-ingress']
TRACING_PORT = "16686"
MONITORING_PORT = "9090"
CASSANDRA_PORT = 9042 # Provide as integer
MONITORING_HOST = "prometheus.istio-system"
-TRACING_HOST = "jaeger-deployment.istio-system"
-CASSANDRA_HOSTS = ['cassandra.default']
+TRACING_HOST = "tracing.istio-system"
+CASSANDRA_HOSTS = ['cassandra.clover-system']
class Collector:
def __init__(self, t_port, t_host, m_port, m_host, c_port, c_hosts):
- logging.basicConfig(filename='collector.log', level=logging.DEBUG)
+
+ # logging.basicConfig(filename='collector.log', level=logging.DEBUG)
+ logging.basicConfig(filename='collector.log', level=logging.ERROR)
+ # logging.getLogger("requests").setLevel(logging.DEBUG)
+ logging.getLogger("requests").setLevel(logging.ERROR)
+ # logging.getLogger("urllib3").setLevel(logging.DEBUG)
+ logging.getLogger("urllib3").setLevel(logging.ERROR)
+
try:
self.t = Tracing(t_host, t_port, '', False)
monitoring_url = "http://{}:{}".format(m_host, m_port)
@@ -40,63 +46,89 @@ class Collector:
logging.debug(e)
# Toplevel tracing retrieval and batch insert
- def get_tracing(self, services, time_back=20):
- self.c.set_batch()
- for service in services:
- traces = self.t.getTraces(service, time_back)
- try:
- self.set_tracing(traces)
- except Exception as e:
- logging.debug(e)
- self.c.execute_batch()
+ def get_tracing(self, time_back=300):
+ try:
+ services = self.r.get_services()
+ for service in services:
+ traces = self.t.getTraces(service.replace("_", "-"), time_back,
+ '20000')
+ try:
+ self.set_tracing(traces)
+ except Exception as e:
+ logging.debug(e)
+
+ # Update list of available services from tracing
+ services = self.t.getServices()
+ self.r.set_tracing_services(services)
+ except Exception as e:
+ logging.debug(e)
# Insert to cassandra visibility traces and spans tables
def set_tracing(self, trace):
for traces in trace['data']:
+ self.c.set_batch()
for spans in traces['spans']:
+ try:
span = {}
span['spanID'] = spans['spanID']
span['duration'] = spans['duration']
span['startTime'] = spans['startTime']
span['operationName'] = spans['operationName']
+
tag = {}
for tags in spans['tags']:
tag[tags['key']] = tags['value']
- self.c.insert_tracing('spans', traces['traceID'],
- span, tag)
+ self.c.insert_span(traces['traceID'], span, tag)
+ except Exception as e:
+ logging.debug("spans loop")
+ logging.debug(e)
+
process_list = []
for p in traces['processes']:
process_list.append(p)
service_names = []
for pname in process_list:
service_names.append(traces['processes'][pname]['serviceName'])
- self.c.insert_trace(traces['traceID'], service_names)
+ try:
+ self.c.insert_trace(traces['traceID'], service_names)
+ self.c.execute_batch()
+ except Exception as e:
+ logging.debug(e)
# Insert to cassandra visibility metrics table
def get_monitoring(self):
- # Fetch collector service/metric lists from redis
- service_names = self.r.get_services()
- metric_prefixes, metric_suffixes = self.r.get_metrics()
-
- self.c.set_batch()
- for sname in service_names:
- for prefix in metric_prefixes:
- for suffix in metric_suffixes:
- try:
- metric_name = prefix + sname + suffix
- query_params = {
- "type": "instant",
- "query": metric_name
- }
- data = self.m.query(query_params)
- m_value = data['data']['result'][0]['value'][1]
- m_time = data['data']['result'][0]['value'][0]
- mn = data['data']['result'][0]['metric']['__name__']
- self.c.insert_metric(mn, m_value, str(m_time), sname)
- except Exception as e:
- logging.debug(e)
- self.c.execute_batch()
+ try:
+ # Fetch collector service/metric lists from redis
+ service_names = self.r.get_services()
+ metric_prefixes, metric_suffixes = self.r.get_metrics()
+
+ self.c.set_batch()
+ for sname in service_names:
+ for prefix in metric_prefixes:
+ for suffix in metric_suffixes:
+ try:
+ metric_name = prefix + sname + suffix
+ query_params = {
+ "type": "instant",
+ "query": metric_name
+ }
+ data = self.m.query(query_params)
+ m_value = data['data']['result'][0]['value'][1]
+ m_time = data['data']['result'][0]['value'][0]
+ mn = data[
+ 'data']['result'][0]['metric']['__name__']
+ self.c.insert_metric(
+ mn, m_value, str(m_time), sname)
+
+ # Add to redis temporarily
+ self.r.r.set(mn, m_value)
+
+ except Exception as e:
+ logging.debug(e)
+ self.c.execute_batch()
+ except Exception as e:
+ logging.debug(e)
# TODO add batch retrieval for monitoring metrics
# query_range_param = {
@@ -124,11 +156,13 @@ def main(args):
loop = True
while loop:
try:
- c.get_tracing(args['t_services'])
+ c.get_tracing()
c.get_monitoring()
time.sleep(int(args['sinterval']))
except KeyboardInterrupt:
loop = False
+ except Exception as e:
+ logging.debug(e)
if __name__ == '__main__':
@@ -154,9 +188,6 @@ if __name__ == '__main__':
parser.add_argument(
'-c_port', default=CASSANDRA_PORT,
help='Port to access Cassandra cluster')
- parser.add_argument(
- '-t_services', default=TRACING_SERVICES,
- help='Collect services on this list of services')
args, unknown = parser.parse_known_args()
print(main(vars(args)))
diff --git a/clover/collector/yaml/manifest.template b/clover/collector/yaml/manifest.template
index c7aa3e7..795bd8f 100644
--- a/clover/collector/yaml/manifest.template
+++ b/clover/collector/yaml/manifest.template
@@ -5,6 +5,7 @@ metadata:
name: {{ deploy_name }}
labels:
app: {{ deploy_name }}
+ namespace: clover-system
spec:
template:
metadata:
@@ -27,6 +28,7 @@ metadata:
name: {{ deploy_name }}
labels:
app: {{ deploy_name }}
+ namespace: clover-system
spec:
ports:
- port: {{ grpc_port }}
diff --git a/clover/tools/yaml/cassandra.yaml b/clover/tools/yaml/cassandra.yaml
index 0206d75..dc1c46f 100644
--- a/clover/tools/yaml/cassandra.yaml
+++ b/clover/tools/yaml/cassandra.yaml
@@ -36,6 +36,7 @@ metadata:
labels:
app: cassandra
name: cassandra
+ namespace: clover-system
spec:
clusterIP: None
ports:
@@ -49,6 +50,7 @@ metadata:
name: cassandra
labels:
app: cassandra
+ namespace: clover-system
spec:
serviceName: cassandra
replicas: 1
@@ -76,18 +78,18 @@ spec:
name: cql
resources:
limits:
- cpu: "500m"
- memory: 1Gi
+ cpu: "1000m"
+ memory: 5Gi
requests:
- cpu: "500m"
- memory: 1Gi
+ cpu: "1000m"
+ memory: 5Gi
env:
- name: MAX_HEAP_SIZE
value: 512M
- name: HEAP_NEWSIZE
value: 100M
- name: CASSANDRA_SEEDS
- value: "cassandra-0.cassandra.default.svc.cluster.local"
+ value: "cassandra-0.cassandra.clover-system.svc.cluster.local"
- name: CASSANDRA_CLUSTER_NAME
value: "MyCassandraDemo"
- name: CASSANDRA_DC