summaryrefslogtreecommitdiffstats
path: root/clover/collector/db/cassops.py
diff options
context:
space:
mode:
Diffstat (limited to 'clover/collector/db/cassops.py')
-rw-r--r--clover/collector/db/cassops.py144
1 files changed, 144 insertions, 0 deletions
diff --git a/clover/collector/db/cassops.py b/clover/collector/db/cassops.py
new file mode 100644
index 0000000..6553cff
--- /dev/null
+++ b/clover/collector/db/cassops.py
@@ -0,0 +1,144 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+from cassandra.cluster import Cluster
+from cassandra.query import BatchStatement
+import logging
+
+CASSANDRA_HOSTS = ['cassandra.default']
+
+
+class CassandraOps:
+
+ def __init__(self, hosts, port=9042, keyspace='visibility'):
+ logging.basicConfig(filename='cassops.log',
+ level=logging.DEBUG)
+ cluster = Cluster(hosts, port=port)
+ self.session = cluster.connect()
+ self.keyspace = keyspace
+
+ def truncate(self, tables=['traces', 'metrics', 'spans']):
+ self.session.set_keyspace(self.keyspace)
+ try:
+ for table in tables:
+ self.session.execute("""
+ TRUNCATE %s
+ """ % table)
+ except Exception as e:
+ logging.debug(e)
+
+ def init_visibility(self):
+ try:
+ self.session.execute("""
+ CREATE KEYSPACE %s
+ WITH replication = { 'class': 'SimpleStrategy',
+ 'replication_factor': '1' }
+ """ % self.keyspace)
+ except Exception as e:
+ logging.debug(e)
+
+ self.session.set_keyspace(self.keyspace)
+
+ try:
+ self.session.execute("""
+ CREATE TABLE IF NOT EXISTS traces (
+ traceid text,
+ processes list<text>,
+ PRIMARY KEY (traceid)
+ )
+ """)
+
+ self.session.execute("""
+ CREATE TABLE IF NOT EXISTS spans (
+ spanid text,
+ traceid text,
+ duration int,
+ start_time int,
+ processid text,
+ operation_name text,
+ node_id text,
+ http_url text,
+ upstream_cluster text,
+ PRIMARY KEY (spanid, traceid)
+ )
+ """)
+
+ self.session.execute("""
+ CREATE TABLE IF NOT EXISTS metrics (
+ m_name text,
+ m_value text,
+ m_time text,
+ service text,
+ monitor_time timestamp,
+ PRIMARY KEY (m_name, monitor_time)
+ )
+ """)
+ except Exception as e:
+ logging.debug(e)
+
+ def set_prepared(self):
+ self.session.set_keyspace(self.keyspace)
+ self.insert_tracing_stmt = self.session.prepare(
+ """
+ INSERT INTO spans (spanid, traceid, duration, operation_name,
+ node_id, http_url, upstream_cluster)
+ VALUES (?, ?, ?, ?, ?, ?, ?)
+ """
+ )
+ self.insert_metric_stmt = self.session.prepare(
+ """
+ INSERT INTO metrics
+ (m_name, m_value, m_time, service, monitor_time)
+ VALUES (?, ?, ?, ?, toTimestamp(now()))
+ """
+ )
+
+ def set_batch(self):
+ self.batch = BatchStatement()
+
+ def execute_batch(self):
+ self.session.execute(self.batch)
+
+ def insert_tracing(self, table, traceid, s, tags):
+ self.session.set_keyspace(self.keyspace)
+ if 'upstream_cluster' not in tags:
+ logging.debug('NO UPSTREAM_CLUSTER KEY')
+ tags['upstream_cluster'] = 'none'
+ try:
+ self.batch.add(self.insert_tracing_stmt,
+ (s['spanID'], traceid, s['duration'],
+ s['operationName'], tags['node_id'],
+ tags['http.url'], tags['upstream_cluster']))
+ except Exception as e:
+ logging.debug('{} {} {} {} {} {} {}'.format(s['spanID'], traceid,
+ s['duration'], s['operationName'], tags['node_id'],
+ tags['http.url'], tags['upstream_cluster']))
+ logging.debug(e)
+
+ def insert_trace(self, traceid, processes):
+ self.session.set_keyspace(self.keyspace)
+ self.session.execute(
+ """
+ INSERT INTO traces (traceid, processes)
+ VALUES (%s, %s)
+ """,
+ (traceid, processes)
+ )
+
+ def insert_metric(self, m_name, m_value, m_time, service):
+ self.session.set_keyspace(self.keyspace)
+ self.batch.add(self.insert_metric_stmt,
+ (m_name, m_value, m_time, service))
+
+
+def main():
+ cass = CassandraOps(CASSANDRA_HOSTS)
+ cass.init_visibility()
+
+
+if __name__ == '__main__':
+ main()