diff options
author | Eddie Arrage <eddie.arrage@huawei.com> | 2018-05-09 18:33:55 +0000 |
---|---|---|
committer | Eddie Arrage <eddie.arrage@huawei.com> | 2018-06-12 06:43:48 +0000 |
commit | dbece18d19c3977019c6727fcbe7a436031666fe (patch) | |
tree | 8eda38ba5fc07f5afc82904a610dc0dad859a4a4 /clover/collector/grpc/collector_server.py | |
parent | 115d3c9ba4de194534cdf0be827c16e04e49951b (diff) |
Initial commit for Clover Collector
- Added a container named clover-collector using clover
container as a base with build script
- GRPC server to manage collector process
- Cassandra DB client interface to initialize visibility keyspace
- Init messaging adds table schemas for tracing - traces & spans
- Adds table for monitoring - metrics
- Does not implement Cassandra server but developed using
public Cassandra docker container
- Collector process in simple loop that periodically fetches
traces and monitoring data and inserts to Cassandra - not optimized
for batch retrieval yet for monitoring
- CLI interface added to collector process and used
by GRPC server for configuration
- Simple GRPC client script to test GRPC server and start/stop
of collector process
- Collector process can be configured with access for tracing,
monitoring and Cassandra
- Added a return value in monitoring query method
- Added ability to truncate tracing, metrics and spans tables
in cql
- Added cql prepared statements and batch insert for metrics
and spans
- Align cql connection to cql deployment within k8s
- Fix issue with cql host list using ast and collect process
args with background argument
- Added redis interface to accept service/metric list
externally for monitoring (will work in conjunction
with clover-controller)
- Use k8s DNS names and default ports for monitoring, tracing
and cassandra
- Added yaml manifest renderer/template for collector
Change-Id: I3e4353e28844c4ce9c185ff4638012b66c7fff67
Signed-off-by: Eddie Arrage <eddie.arrage@huawei.com>
Diffstat (limited to 'clover/collector/grpc/collector_server.py')
-rw-r--r-- | clover/collector/grpc/collector_server.py | 98 |
1 files changed, 98 insertions, 0 deletions
diff --git a/clover/collector/grpc/collector_server.py b/clover/collector/grpc/collector_server.py new file mode 100644 index 0000000..c2eb221 --- /dev/null +++ b/clover/collector/grpc/collector_server.py @@ -0,0 +1,98 @@ +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + + +from concurrent import futures +from clover.collector.db.cassops import CassandraOps +import time +import sys +import grpc +import subprocess +import pickle +import logging +import collector_pb2 +import collector_pb2_grpc + + +_ONE_DAY_IN_SECONDS = 60 * 60 * 24 +GRPC_PORT = '[::]:50054' + + +class Controller(collector_pb2_grpc.ControllerServicer): + + def __init__(self, init_visibility): + logging.basicConfig(filename='collector_server.log', + level=logging.DEBUG) + self.collector = 0 + if init_visibility == 'set_schemas': + cassandra_hosts = pickle.dumps(['cassandra.default']) + self.InitVisibility(collector_pb2.ConfigCassandra( + cassandra_port=9042, cassandra_hosts=cassandra_hosts), "") + + def StopCollector(self, r, context): + try: + subprocess.Popen.kill(self.collector) + msg = "Stopped collector on pid: {}".format(self.collector.pid) + except Exception as e: + logging.debug(e) + msg = "Failed to stop collector" + return collector_pb2.CollectorReply(message=msg) + + def StartCollector(self, r, context): + try: + self.collector = subprocess.Popen( + ["python", "process/collect.py", + "-sinterval={}".format(r.sinterval), + "-c_port={}".format(r.c_port), + "-t_port={}".format(r.t_port), "-t_host={}".format(r.t_host), + "-m_port={}".format(r.m_port), "-m_host={}".format(r.m_host), + "-c_hosts={}".format(pickle.loads(r.c_hosts)), "&"], + shell=False) + msg = "Started collector on pid: {}".format(self.collector.pid) + except Exception as e: + logging.debug(e) + msg = e + return collector_pb2.CollectorReply(message=msg) + + def InitVisibility(self, r, context): + try: + cass = CassandraOps(pickle.loads(r.cassandra_hosts), + r.cassandra_port) + cass.init_visibility() + msg = "Added visibility schemas in cassandra" + except Exception as e: + logging.debug(e) + msg = "Failed to initialize cassandra" + return collector_pb2.CollectorReply(message=msg) + + def TruncateVisibility(self, r, context): + try: + cass = CassandraOps(pickle.loads(r.cassandra_hosts), + r.cassandra_port) + cass.truncate(pickle.loads(r.schemas)) + msg = "Truncated visibility tables" + except Exception as e: + logging.debug(e) + msg = "Failed to truncate visibility" + return collector_pb2.CollectorReply(message=msg) + + +def serve(init_visibility): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + collector_pb2_grpc.add_ControllerServicer_to_server( + Controller(init_visibility), server) + server.add_insecure_port(GRPC_PORT) + server.start() + try: + while True: + time.sleep(_ONE_DAY_IN_SECONDS) + except KeyboardInterrupt: + server.stop(0) + + +if __name__ == '__main__': + serve(sys.argv[1]) |