From dbece18d19c3977019c6727fcbe7a436031666fe Mon Sep 17 00:00:00 2001 From: Eddie Arrage Date: Wed, 9 May 2018 18:33:55 +0000 Subject: Initial commit for Clover Collector - Added a container named clover-collector using clover container as a base with build script - GRPC server to manage collector process - Cassandra DB client interface to initialize visibility keyspace - Init messaging adds table schemas for tracing - traces & spans - Adds table for monitoring - metrics - Does not implement Cassandra server but developed using public Cassandra docker container - Collector process in simple loop that periodically fetches traces and monitoring data and inserts to Cassandra - not optimized for batch retrieval yet for monitoring - CLI interface added to collector process and used by GRPC server for configuration - Simple GRPC client script to test GRPC server and start/stop of collector process - Collector process can be configured with access for tracing, monitoring and Cassandra - Added a return value in monitoring query method - Added ability to truncate tracing, metrics and spans tables in cql - Added cql prepared statements and batch insert for metrics and spans - Align cql connection to cql deployment within k8s - Fix issue with cql host list using ast and collect process args with background argument - Added redis interface to accept service/metric list externally for monitoring (will work in conjunction with clover-controller) - Use k8s DNS names and default ports for monitoring, tracing and cassandra - Added yaml manifest renderer/template for collector Change-Id: I3e4353e28844c4ce9c185ff4638012b66c7fff67 Signed-off-by: Eddie Arrage --- clover/collector/db/cassops.py | 144 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 clover/collector/db/cassops.py (limited to 'clover/collector/db/cassops.py') diff --git a/clover/collector/db/cassops.py b/clover/collector/db/cassops.py new file mode 100644 index 0000000..6553cff --- /dev/null +++ b/clover/collector/db/cassops.py @@ -0,0 +1,144 @@ +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +from cassandra.cluster import Cluster +from cassandra.query import BatchStatement +import logging + +CASSANDRA_HOSTS = ['cassandra.default'] + + +class CassandraOps: + + def __init__(self, hosts, port=9042, keyspace='visibility'): + logging.basicConfig(filename='cassops.log', + level=logging.DEBUG) + cluster = Cluster(hosts, port=port) + self.session = cluster.connect() + self.keyspace = keyspace + + def truncate(self, tables=['traces', 'metrics', 'spans']): + self.session.set_keyspace(self.keyspace) + try: + for table in tables: + self.session.execute(""" + TRUNCATE %s + """ % table) + except Exception as e: + logging.debug(e) + + def init_visibility(self): + try: + self.session.execute(""" + CREATE KEYSPACE %s + WITH replication = { 'class': 'SimpleStrategy', + 'replication_factor': '1' } + """ % self.keyspace) + except Exception as e: + logging.debug(e) + + self.session.set_keyspace(self.keyspace) + + try: + self.session.execute(""" + CREATE TABLE IF NOT EXISTS traces ( + traceid text, + processes list, + PRIMARY KEY (traceid) + ) + """) + + self.session.execute(""" + CREATE TABLE IF NOT EXISTS spans ( + spanid text, + traceid text, + duration int, + start_time int, + processid text, + operation_name text, + node_id text, + http_url text, + upstream_cluster text, + PRIMARY KEY (spanid, traceid) + ) + """) + + self.session.execute(""" + CREATE TABLE IF NOT EXISTS metrics ( + m_name text, + m_value text, + m_time text, + service text, + monitor_time timestamp, + PRIMARY KEY (m_name, monitor_time) + ) + """) + except Exception as e: + logging.debug(e) + + def set_prepared(self): + self.session.set_keyspace(self.keyspace) + self.insert_tracing_stmt = self.session.prepare( + """ + INSERT INTO spans (spanid, traceid, duration, operation_name, + node_id, http_url, upstream_cluster) + VALUES (?, ?, ?, ?, ?, ?, ?) + """ + ) + self.insert_metric_stmt = self.session.prepare( + """ + INSERT INTO metrics + (m_name, m_value, m_time, service, monitor_time) + VALUES (?, ?, ?, ?, toTimestamp(now())) + """ + ) + + def set_batch(self): + self.batch = BatchStatement() + + def execute_batch(self): + self.session.execute(self.batch) + + def insert_tracing(self, table, traceid, s, tags): + self.session.set_keyspace(self.keyspace) + if 'upstream_cluster' not in tags: + logging.debug('NO UPSTREAM_CLUSTER KEY') + tags['upstream_cluster'] = 'none' + try: + self.batch.add(self.insert_tracing_stmt, + (s['spanID'], traceid, s['duration'], + s['operationName'], tags['node_id'], + tags['http.url'], tags['upstream_cluster'])) + except Exception as e: + logging.debug('{} {} {} {} {} {} {}'.format(s['spanID'], traceid, + s['duration'], s['operationName'], tags['node_id'], + tags['http.url'], tags['upstream_cluster'])) + logging.debug(e) + + def insert_trace(self, traceid, processes): + self.session.set_keyspace(self.keyspace) + self.session.execute( + """ + INSERT INTO traces (traceid, processes) + VALUES (%s, %s) + """, + (traceid, processes) + ) + + def insert_metric(self, m_name, m_value, m_time, service): + self.session.set_keyspace(self.keyspace) + self.batch.add(self.insert_metric_stmt, + (m_name, m_value, m_time, service)) + + +def main(): + cass = CassandraOps(CASSANDRA_HOSTS) + cass.init_visibility() + + +if __name__ == '__main__': + main() -- cgit 1.2.3-korg