diff options
author | Eddie Arrage <eddie.arrage@huawei.com> | 2018-05-09 18:33:55 +0000 |
---|---|---|
committer | Eddie Arrage <eddie.arrage@huawei.com> | 2018-06-12 06:43:48 +0000 |
commit | dbece18d19c3977019c6727fcbe7a436031666fe (patch) | |
tree | 8eda38ba5fc07f5afc82904a610dc0dad859a4a4 /clover/collector/db/cassops.py | |
parent | 115d3c9ba4de194534cdf0be827c16e04e49951b (diff) |
Initial commit for Clover Collector
- Added a container named clover-collector using clover
container as a base with build script
- GRPC server to manage collector process
- Cassandra DB client interface to initialize visibility keyspace
- Init messaging adds table schemas for tracing - traces & spans
- Adds table for monitoring - metrics
- Does not implement Cassandra server but developed using
public Cassandra docker container
- Collector process in simple loop that periodically fetches
traces and monitoring data and inserts to Cassandra - not optimized
for batch retrieval yet for monitoring
- CLI interface added to collector process and used
by GRPC server for configuration
- Simple GRPC client script to test GRPC server and start/stop
of collector process
- Collector process can be configured with access for tracing,
monitoring and Cassandra
- Added a return value in monitoring query method
- Added ability to truncate tracing, metrics and spans tables
in cql
- Added cql prepared statements and batch insert for metrics
and spans
- Align cql connection to cql deployment within k8s
- Fix issue with cql host list using ast and collect process
args with background argument
- Added redis interface to accept service/metric list
externally for monitoring (will work in conjunction
with clover-controller)
- Use k8s DNS names and default ports for monitoring, tracing
and cassandra
- Added yaml manifest renderer/template for collector
Change-Id: I3e4353e28844c4ce9c185ff4638012b66c7fff67
Signed-off-by: Eddie Arrage <eddie.arrage@huawei.com>
Diffstat (limited to 'clover/collector/db/cassops.py')
-rw-r--r-- | clover/collector/db/cassops.py | 144 |
1 files changed, 144 insertions, 0 deletions
diff --git a/clover/collector/db/cassops.py b/clover/collector/db/cassops.py new file mode 100644 index 0000000..6553cff --- /dev/null +++ b/clover/collector/db/cassops.py @@ -0,0 +1,144 @@ +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +from cassandra.cluster import Cluster +from cassandra.query import BatchStatement +import logging + +CASSANDRA_HOSTS = ['cassandra.default'] + + +class CassandraOps: + + def __init__(self, hosts, port=9042, keyspace='visibility'): + logging.basicConfig(filename='cassops.log', + level=logging.DEBUG) + cluster = Cluster(hosts, port=port) + self.session = cluster.connect() + self.keyspace = keyspace + + def truncate(self, tables=['traces', 'metrics', 'spans']): + self.session.set_keyspace(self.keyspace) + try: + for table in tables: + self.session.execute(""" + TRUNCATE %s + """ % table) + except Exception as e: + logging.debug(e) + + def init_visibility(self): + try: + self.session.execute(""" + CREATE KEYSPACE %s + WITH replication = { 'class': 'SimpleStrategy', + 'replication_factor': '1' } + """ % self.keyspace) + except Exception as e: + logging.debug(e) + + self.session.set_keyspace(self.keyspace) + + try: + self.session.execute(""" + CREATE TABLE IF NOT EXISTS traces ( + traceid text, + processes list<text>, + PRIMARY KEY (traceid) + ) + """) + + self.session.execute(""" + CREATE TABLE IF NOT EXISTS spans ( + spanid text, + traceid text, + duration int, + start_time int, + processid text, + operation_name text, + node_id text, + http_url text, + upstream_cluster text, + PRIMARY KEY (spanid, traceid) + ) + """) + + self.session.execute(""" + CREATE TABLE IF NOT EXISTS metrics ( + m_name text, + m_value text, + m_time text, + service text, + monitor_time timestamp, + PRIMARY KEY (m_name, monitor_time) + ) + """) + except Exception as e: + logging.debug(e) + + def set_prepared(self): + self.session.set_keyspace(self.keyspace) + self.insert_tracing_stmt = self.session.prepare( + """ + INSERT INTO spans (spanid, traceid, duration, operation_name, + node_id, http_url, upstream_cluster) + VALUES (?, ?, ?, ?, ?, ?, ?) + """ + ) + self.insert_metric_stmt = self.session.prepare( + """ + INSERT INTO metrics + (m_name, m_value, m_time, service, monitor_time) + VALUES (?, ?, ?, ?, toTimestamp(now())) + """ + ) + + def set_batch(self): + self.batch = BatchStatement() + + def execute_batch(self): + self.session.execute(self.batch) + + def insert_tracing(self, table, traceid, s, tags): + self.session.set_keyspace(self.keyspace) + if 'upstream_cluster' not in tags: + logging.debug('NO UPSTREAM_CLUSTER KEY') + tags['upstream_cluster'] = 'none' + try: + self.batch.add(self.insert_tracing_stmt, + (s['spanID'], traceid, s['duration'], + s['operationName'], tags['node_id'], + tags['http.url'], tags['upstream_cluster'])) + except Exception as e: + logging.debug('{} {} {} {} {} {} {}'.format(s['spanID'], traceid, + s['duration'], s['operationName'], tags['node_id'], + tags['http.url'], tags['upstream_cluster'])) + logging.debug(e) + + def insert_trace(self, traceid, processes): + self.session.set_keyspace(self.keyspace) + self.session.execute( + """ + INSERT INTO traces (traceid, processes) + VALUES (%s, %s) + """, + (traceid, processes) + ) + + def insert_metric(self, m_name, m_value, m_time, service): + self.session.set_keyspace(self.keyspace) + self.batch.add(self.insert_metric_stmt, + (m_name, m_value, m_time, service)) + + +def main(): + cass = CassandraOps(CASSANDRA_HOSTS) + cass.init_visibility() + + +if __name__ == '__main__': + main() |