summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorStephen Wong <stephen.kf.wong@gmail.com>2018-10-21 06:07:07 +0000
committerGerrit Code Review <gerrit@opnfv.org>2018-10-21 06:07:07 +0000
commitb713d2f3fee2d8164bf16c7249d75c3da827005b (patch)
tree35ccdd08739634e08d749606f433541583a9d290
parent72f00b63cf5cc1a53343cafcdd2b26be31d5ed9e (diff)
parentc36f82e76df1f2f506f770935093828d3d573e6b (diff)
Merge "Improve data ingestion reliability and functionality"
-rw-r--r--clover/collector/db/cassops.py54
-rw-r--r--clover/collector/db/redisops.py14
-rw-r--r--clover/collector/docker/Dockerfile9
-rw-r--r--clover/collector/grpc/collector_client.py6
-rw-r--r--clover/collector/grpc/collector_server.py2
-rw-r--r--clover/collector/process/collect.py115
-rw-r--r--clover/collector/yaml/manifest.template2
-rw-r--r--clover/tools/yaml/cassandra.yaml12
8 files changed, 128 insertions, 86 deletions
diff --git a/clover/collector/db/cassops.py b/clover/collector/db/cassops.py
index 6553cff..0bc9d84 100644
--- a/clover/collector/db/cassops.py
+++ b/clover/collector/db/cassops.py
@@ -9,7 +9,7 @@ from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
import logging
-CASSANDRA_HOSTS = ['cassandra.default']
+CASSANDRA_HOSTS = ['cassandra.clover-system']
class CassandraOps:
@@ -57,13 +57,18 @@ class CassandraOps:
spanid text,
traceid text,
duration int,
- start_time int,
+ start_time timestamp,
processid text,
operation_name text,
node_id text,
http_url text,
+ user_agent text,
+ request_size text,
+ response_size text,
+ status_code text,
upstream_cluster text,
- PRIMARY KEY (spanid, traceid)
+ insert_time timestamp,
+ PRIMARY KEY (traceid, spanid)
)
""")
@@ -82,11 +87,18 @@ class CassandraOps:
def set_prepared(self):
self.session.set_keyspace(self.keyspace)
- self.insert_tracing_stmt = self.session.prepare(
+ self.insert_span_stmt = self.session.prepare(
"""
INSERT INTO spans (spanid, traceid, duration, operation_name,
- node_id, http_url, upstream_cluster)
- VALUES (?, ?, ?, ?, ?, ?, ?)
+ node_id, http_url, upstream_cluster, start_time, user_agent,
+ request_size, response_size, status_code, insert_time)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, toTimestamp(now()))
+ """
+ )
+ self.insert_trace_stmt = self.session.prepare(
+ """
+ INSERT INTO traces (traceid, processes)
+ VALUES (?, ?)
"""
)
self.insert_metric_stmt = self.session.prepare(
@@ -103,31 +115,31 @@ class CassandraOps:
def execute_batch(self):
self.session.execute(self.batch)
- def insert_tracing(self, table, traceid, s, tags):
+ def insert_span(self, traceid, s, tags):
self.session.set_keyspace(self.keyspace)
if 'upstream_cluster' not in tags:
- logging.debug('NO UPSTREAM_CLUSTER KEY')
+ # logging.debug('NO UPSTREAM_CLUSTER KEY')
tags['upstream_cluster'] = 'none'
try:
- self.batch.add(self.insert_tracing_stmt,
+ self.batch.add(self.insert_span_stmt,
(s['spanID'], traceid, s['duration'],
s['operationName'], tags['node_id'],
- tags['http.url'], tags['upstream_cluster']))
+ tags['http.url'], tags['upstream_cluster'],
+ int(str(s['startTime'])[0:13]), tags['user_agent'],
+ tags['request_size'], tags['response_size'],
+ tags['http.status_code']))
+ except KeyError as e:
+ logging.debug('Insert span error: {}, Tags: {}'.format(e, tags))
except Exception as e:
- logging.debug('{} {} {} {} {} {} {}'.format(s['spanID'], traceid,
- s['duration'], s['operationName'], tags['node_id'],
- tags['http.url'], tags['upstream_cluster']))
- logging.debug(e)
+ logging.debug('Insert span error: {}'.format(e))
+ logging.debug('Tags: {}'.format(tags))
+ logging.debug('Span toplevel: {}'.format(s))
+ logging.debug(
+ 'startTime: {}'.format(int(str(s['startTime'])[0:13])))
def insert_trace(self, traceid, processes):
self.session.set_keyspace(self.keyspace)
- self.session.execute(
- """
- INSERT INTO traces (traceid, processes)
- VALUES (%s, %s)
- """,
- (traceid, processes)
- )
+ self.batch.add(self.insert_trace_stmt, (traceid, processes))
def insert_metric(self, m_name, m_value, m_time, service):
self.session.set_keyspace(self.keyspace)
diff --git a/clover/collector/db/redisops.py b/clover/collector/db/redisops.py
index e80c417..24fbeb9 100644
--- a/clover/collector/db/redisops.py
+++ b/clover/collector/db/redisops.py
@@ -8,8 +8,7 @@
import redis
import logging
-REDIS_HOST = 'redis'
-# REDIS_HOST = '10.244.0.85'
+REDIS_HOST = 'redis.default'
class RedisOps:
@@ -27,11 +26,16 @@ class RedisOps:
for s in service_names:
self.r.sadd(skey, s)
+ def set_tracing_services(self, services, skey='tracing_services'):
+ self.r.delete(skey)
+ for s in services:
+ self.r.sadd(skey, s)
+
def init_metrics(self, pkey='metric_prefixes', skey='metric_suffixes'):
- metric_prefixes = ['envoy_cluster_out_', 'envoy_cluster_in_']
+ metric_prefixes = ['envoy_cluster_outbound_', 'envoy_cluster_inbound_']
metric_suffixes = [
- '_default_svc_cluster_local_http_internal_upstream_rq_2xx',
- '_default_svc_cluster_local_http_upstream_cx_active']
+ '_default_svc_cluster_local_upstream_rq_2xx',
+ '_default_svc_cluster_local_upstream_cx_active']
for p in metric_prefixes:
self.r.sadd(pkey, p)
for s in metric_suffixes:
diff --git a/clover/collector/docker/Dockerfile b/clover/collector/docker/Dockerfile
index 1714420..7b6effd 100644
--- a/clover/collector/docker/Dockerfile
+++ b/clover/collector/docker/Dockerfile
@@ -16,15 +16,6 @@ ENV CLOVER_REPO_DIR="${REPOS_DIR}/clover"
RUN python -m pip install cassandra-driver redis
# Set work directory
-WORKDIR ${CLOVER_REPO_DIR}
-
-COPY /process clover/collector/process
-COPY /grpc clover/collector/grpc
-COPY /db clover/collector/db
-COPY __init__.py clover/collector/__init__.py
-
-RUN pip install .
-
WORKDIR "${CLOVER_REPO_DIR}/clover/collector"
CMD ./process/grpc_process.sh no_schema_init
diff --git a/clover/collector/grpc/collector_client.py b/clover/collector/grpc/collector_client.py
index b9e9f67..65ff2ff 100644
--- a/clover/collector/grpc/collector_client.py
+++ b/clover/collector/grpc/collector_client.py
@@ -55,7 +55,7 @@ def get_podip(pod_name):
def init_visibility(stub):
try:
- cassandra_hosts = pickle.dumps(['cassandra.default'])
+ cassandra_hosts = pickle.dumps(['cassandra.clover-system'])
response = stub.InitVisibility(collector_pb2.ConfigCassandra(
cassandra_hosts=cassandra_hosts, cassandra_port=9042))
except Exception as e:
@@ -65,7 +65,7 @@ def init_visibility(stub):
def clean_visibility(stub):
try:
- cassandra_hosts = pickle.dumps(['cassandra.default'])
+ cassandra_hosts = pickle.dumps(['cassandra.clover-system'])
schemas = pickle.dumps(['spans', 'traces', 'metrics'])
response = stub.TruncateVisibility(collector_pb2.Schemas(
schemas=schemas, cassandra_hosts=cassandra_hosts,
@@ -77,7 +77,7 @@ def clean_visibility(stub):
def start_collector(stub):
try:
- cassandra_hosts = pickle.dumps(['cassandra.default'])
+ cassandra_hosts = pickle.dumps(['cassandra.clover-system'])
response = stub.StartCollector(collector_pb2.ConfigCollector(
t_port='16686', t_host='jaeger-deployment.istio-system',
m_port='9090', m_host='prometheus.istio-system',
diff --git a/clover/collector/grpc/collector_server.py b/clover/collector/grpc/collector_server.py
index c2eb221..a10078e 100644
--- a/clover/collector/grpc/collector_server.py
+++ b/clover/collector/grpc/collector_server.py
@@ -29,7 +29,7 @@ class Controller(collector_pb2_grpc.ControllerServicer):
level=logging.DEBUG)
self.collector = 0
if init_visibility == 'set_schemas':
- cassandra_hosts = pickle.dumps(['cassandra.default'])
+ cassandra_hosts = pickle.dumps(['cassandra.clover-system'])
self.InitVisibility(collector_pb2.ConfigCassandra(
cassandra_port=9042, cassandra_hosts=cassandra_hosts), "")
diff --git a/clover/collector/process/collect.py b/clover/collector/process/collect.py
index d8beb49..3d9df8a 100644
--- a/clover/collector/process/collect.py
+++ b/clover/collector/process/collect.py
@@ -16,19 +16,25 @@ import argparse
import logging
import ast
-TRACING_SERVICES = ['istio-ingress']
TRACING_PORT = "16686"
MONITORING_PORT = "9090"
CASSANDRA_PORT = 9042 # Provide as integer
MONITORING_HOST = "prometheus.istio-system"
-TRACING_HOST = "jaeger-deployment.istio-system"
-CASSANDRA_HOSTS = ['cassandra.default']
+TRACING_HOST = "tracing.istio-system"
+CASSANDRA_HOSTS = ['cassandra.clover-system']
class Collector:
def __init__(self, t_port, t_host, m_port, m_host, c_port, c_hosts):
- logging.basicConfig(filename='collector.log', level=logging.DEBUG)
+
+ # logging.basicConfig(filename='collector.log', level=logging.DEBUG)
+ logging.basicConfig(filename='collector.log', level=logging.ERROR)
+ # logging.getLogger("requests").setLevel(logging.DEBUG)
+ logging.getLogger("requests").setLevel(logging.ERROR)
+ # logging.getLogger("urllib3").setLevel(logging.DEBUG)
+ logging.getLogger("urllib3").setLevel(logging.ERROR)
+
try:
self.t = Tracing(t_host, t_port, '', False)
monitoring_url = "http://{}:{}".format(m_host, m_port)
@@ -40,63 +46,89 @@ class Collector:
logging.debug(e)
# Toplevel tracing retrieval and batch insert
- def get_tracing(self, services, time_back=20):
- self.c.set_batch()
- for service in services:
- traces = self.t.getTraces(service, time_back)
- try:
- self.set_tracing(traces)
- except Exception as e:
- logging.debug(e)
- self.c.execute_batch()
+ def get_tracing(self, time_back=300):
+ try:
+ services = self.r.get_services()
+ for service in services:
+ traces = self.t.getTraces(service.replace("_", "-"), time_back,
+ '20000')
+ try:
+ self.set_tracing(traces)
+ except Exception as e:
+ logging.debug(e)
+
+ # Update list of available services from tracing
+ services = self.t.getServices()
+ self.r.set_tracing_services(services)
+ except Exception as e:
+ logging.debug(e)
# Insert to cassandra visibility traces and spans tables
def set_tracing(self, trace):
for traces in trace['data']:
+ self.c.set_batch()
for spans in traces['spans']:
+ try:
span = {}
span['spanID'] = spans['spanID']
span['duration'] = spans['duration']
span['startTime'] = spans['startTime']
span['operationName'] = spans['operationName']
+
tag = {}
for tags in spans['tags']:
tag[tags['key']] = tags['value']
- self.c.insert_tracing('spans', traces['traceID'],
- span, tag)
+ self.c.insert_span(traces['traceID'], span, tag)
+ except Exception as e:
+ logging.debug("spans loop")
+ logging.debug(e)
+
process_list = []
for p in traces['processes']:
process_list.append(p)
service_names = []
for pname in process_list:
service_names.append(traces['processes'][pname]['serviceName'])
- self.c.insert_trace(traces['traceID'], service_names)
+ try:
+ self.c.insert_trace(traces['traceID'], service_names)
+ self.c.execute_batch()
+ except Exception as e:
+ logging.debug(e)
# Insert to cassandra visibility metrics table
def get_monitoring(self):
- # Fetch collector service/metric lists from redis
- service_names = self.r.get_services()
- metric_prefixes, metric_suffixes = self.r.get_metrics()
-
- self.c.set_batch()
- for sname in service_names:
- for prefix in metric_prefixes:
- for suffix in metric_suffixes:
- try:
- metric_name = prefix + sname + suffix
- query_params = {
- "type": "instant",
- "query": metric_name
- }
- data = self.m.query(query_params)
- m_value = data['data']['result'][0]['value'][1]
- m_time = data['data']['result'][0]['value'][0]
- mn = data['data']['result'][0]['metric']['__name__']
- self.c.insert_metric(mn, m_value, str(m_time), sname)
- except Exception as e:
- logging.debug(e)
- self.c.execute_batch()
+ try:
+ # Fetch collector service/metric lists from redis
+ service_names = self.r.get_services()
+ metric_prefixes, metric_suffixes = self.r.get_metrics()
+
+ self.c.set_batch()
+ for sname in service_names:
+ for prefix in metric_prefixes:
+ for suffix in metric_suffixes:
+ try:
+ metric_name = prefix + sname + suffix
+ query_params = {
+ "type": "instant",
+ "query": metric_name
+ }
+ data = self.m.query(query_params)
+ m_value = data['data']['result'][0]['value'][1]
+ m_time = data['data']['result'][0]['value'][0]
+ mn = data[
+ 'data']['result'][0]['metric']['__name__']
+ self.c.insert_metric(
+ mn, m_value, str(m_time), sname)
+
+ # Add to redis temporarily
+ self.r.r.set(mn, m_value)
+
+ except Exception as e:
+ logging.debug(e)
+ self.c.execute_batch()
+ except Exception as e:
+ logging.debug(e)
# TODO add batch retrieval for monitoring metrics
# query_range_param = {
@@ -124,11 +156,13 @@ def main(args):
loop = True
while loop:
try:
- c.get_tracing(args['t_services'])
+ c.get_tracing()
c.get_monitoring()
time.sleep(int(args['sinterval']))
except KeyboardInterrupt:
loop = False
+ except Exception as e:
+ logging.debug(e)
if __name__ == '__main__':
@@ -154,9 +188,6 @@ if __name__ == '__main__':
parser.add_argument(
'-c_port', default=CASSANDRA_PORT,
help='Port to access Cassandra cluster')
- parser.add_argument(
- '-t_services', default=TRACING_SERVICES,
- help='Collect services on this list of services')
args, unknown = parser.parse_known_args()
print(main(vars(args)))
diff --git a/clover/collector/yaml/manifest.template b/clover/collector/yaml/manifest.template
index c7aa3e7..795bd8f 100644
--- a/clover/collector/yaml/manifest.template
+++ b/clover/collector/yaml/manifest.template
@@ -5,6 +5,7 @@ metadata:
name: {{ deploy_name }}
labels:
app: {{ deploy_name }}
+ namespace: clover-system
spec:
template:
metadata:
@@ -27,6 +28,7 @@ metadata:
name: {{ deploy_name }}
labels:
app: {{ deploy_name }}
+ namespace: clover-system
spec:
ports:
- port: {{ grpc_port }}
diff --git a/clover/tools/yaml/cassandra.yaml b/clover/tools/yaml/cassandra.yaml
index 0206d75..dc1c46f 100644
--- a/clover/tools/yaml/cassandra.yaml
+++ b/clover/tools/yaml/cassandra.yaml
@@ -36,6 +36,7 @@ metadata:
labels:
app: cassandra
name: cassandra
+ namespace: clover-system
spec:
clusterIP: None
ports:
@@ -49,6 +50,7 @@ metadata:
name: cassandra
labels:
app: cassandra
+ namespace: clover-system
spec:
serviceName: cassandra
replicas: 1
@@ -76,18 +78,18 @@ spec:
name: cql
resources:
limits:
- cpu: "500m"
- memory: 1Gi
+ cpu: "1000m"
+ memory: 5Gi
requests:
- cpu: "500m"
- memory: 1Gi
+ cpu: "1000m"
+ memory: 5Gi
env:
- name: MAX_HEAP_SIZE
value: 512M
- name: HEAP_NEWSIZE
value: 100M
- name: CASSANDRA_SEEDS
- value: "cassandra-0.cassandra.default.svc.cluster.local"
+ value: "cassandra-0.cassandra.clover-system.svc.cluster.local"
- name: CASSANDRA_CLUSTER_NAME
value: "MyCassandraDemo"
- name: CASSANDRA_DC