summaryrefslogtreecommitdiffstats
path: root/clover/collector/grpc/collector_server.py
diff options
context:
space:
mode:
authorEddie Arrage <eddie.arrage@huawei.com>2018-05-09 18:33:55 +0000
committerEddie Arrage <eddie.arrage@huawei.com>2018-06-12 06:43:48 +0000
commitdbece18d19c3977019c6727fcbe7a436031666fe (patch)
tree8eda38ba5fc07f5afc82904a610dc0dad859a4a4 /clover/collector/grpc/collector_server.py
parent115d3c9ba4de194534cdf0be827c16e04e49951b (diff)
Initial commit for Clover Collector
- Added a container named clover-collector using clover container as a base with build script - GRPC server to manage collector process - Cassandra DB client interface to initialize visibility keyspace - Init messaging adds table schemas for tracing - traces & spans - Adds table for monitoring - metrics - Does not implement Cassandra server but developed using public Cassandra docker container - Collector process in simple loop that periodically fetches traces and monitoring data and inserts to Cassandra - not optimized for batch retrieval yet for monitoring - CLI interface added to collector process and used by GRPC server for configuration - Simple GRPC client script to test GRPC server and start/stop of collector process - Collector process can be configured with access for tracing, monitoring and Cassandra - Added a return value in monitoring query method - Added ability to truncate tracing, metrics and spans tables in cql - Added cql prepared statements and batch insert for metrics and spans - Align cql connection to cql deployment within k8s - Fix issue with cql host list using ast and collect process args with background argument - Added redis interface to accept service/metric list externally for monitoring (will work in conjunction with clover-controller) - Use k8s DNS names and default ports for monitoring, tracing and cassandra - Added yaml manifest renderer/template for collector Change-Id: I3e4353e28844c4ce9c185ff4638012b66c7fff67 Signed-off-by: Eddie Arrage <eddie.arrage@huawei.com>
Diffstat (limited to 'clover/collector/grpc/collector_server.py')
-rw-r--r--clover/collector/grpc/collector_server.py98
1 files changed, 98 insertions, 0 deletions
diff --git a/clover/collector/grpc/collector_server.py b/clover/collector/grpc/collector_server.py
new file mode 100644
index 0000000..c2eb221
--- /dev/null
+++ b/clover/collector/grpc/collector_server.py
@@ -0,0 +1,98 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+
+from concurrent import futures
+from clover.collector.db.cassops import CassandraOps
+import time
+import sys
+import grpc
+import subprocess
+import pickle
+import logging
+import collector_pb2
+import collector_pb2_grpc
+
+
+_ONE_DAY_IN_SECONDS = 60 * 60 * 24
+GRPC_PORT = '[::]:50054'
+
+
+class Controller(collector_pb2_grpc.ControllerServicer):
+
+ def __init__(self, init_visibility):
+ logging.basicConfig(filename='collector_server.log',
+ level=logging.DEBUG)
+ self.collector = 0
+ if init_visibility == 'set_schemas':
+ cassandra_hosts = pickle.dumps(['cassandra.default'])
+ self.InitVisibility(collector_pb2.ConfigCassandra(
+ cassandra_port=9042, cassandra_hosts=cassandra_hosts), "")
+
+ def StopCollector(self, r, context):
+ try:
+ subprocess.Popen.kill(self.collector)
+ msg = "Stopped collector on pid: {}".format(self.collector.pid)
+ except Exception as e:
+ logging.debug(e)
+ msg = "Failed to stop collector"
+ return collector_pb2.CollectorReply(message=msg)
+
+ def StartCollector(self, r, context):
+ try:
+ self.collector = subprocess.Popen(
+ ["python", "process/collect.py",
+ "-sinterval={}".format(r.sinterval),
+ "-c_port={}".format(r.c_port),
+ "-t_port={}".format(r.t_port), "-t_host={}".format(r.t_host),
+ "-m_port={}".format(r.m_port), "-m_host={}".format(r.m_host),
+ "-c_hosts={}".format(pickle.loads(r.c_hosts)), "&"],
+ shell=False)
+ msg = "Started collector on pid: {}".format(self.collector.pid)
+ except Exception as e:
+ logging.debug(e)
+ msg = e
+ return collector_pb2.CollectorReply(message=msg)
+
+ def InitVisibility(self, r, context):
+ try:
+ cass = CassandraOps(pickle.loads(r.cassandra_hosts),
+ r.cassandra_port)
+ cass.init_visibility()
+ msg = "Added visibility schemas in cassandra"
+ except Exception as e:
+ logging.debug(e)
+ msg = "Failed to initialize cassandra"
+ return collector_pb2.CollectorReply(message=msg)
+
+ def TruncateVisibility(self, r, context):
+ try:
+ cass = CassandraOps(pickle.loads(r.cassandra_hosts),
+ r.cassandra_port)
+ cass.truncate(pickle.loads(r.schemas))
+ msg = "Truncated visibility tables"
+ except Exception as e:
+ logging.debug(e)
+ msg = "Failed to truncate visibility"
+ return collector_pb2.CollectorReply(message=msg)
+
+
+def serve(init_visibility):
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ collector_pb2_grpc.add_ControllerServicer_to_server(
+ Controller(init_visibility), server)
+ server.add_insecure_port(GRPC_PORT)
+ server.start()
+ try:
+ while True:
+ time.sleep(_ONE_DAY_IN_SECONDS)
+ except KeyboardInterrupt:
+ server.stop(0)
+
+
+if __name__ == '__main__':
+ serve(sys.argv[1])