diff options
Diffstat (limited to 'clover/collector/grpc')
-rw-r--r-- | clover/collector/grpc/__init__.py | 0 | ||||
-rwxr-xr-x | clover/collector/grpc/build_proto.sh | 11 | ||||
-rw-r--r-- | clover/collector/grpc/collector.proto | 45 | ||||
-rw-r--r-- | clover/collector/grpc/collector_client.py | 105 | ||||
-rw-r--r-- | clover/collector/grpc/collector_pb2.py | 300 | ||||
-rw-r--r-- | clover/collector/grpc/collector_pb2_grpc.py | 97 | ||||
-rw-r--r-- | clover/collector/grpc/collector_server.py | 98 |
7 files changed, 656 insertions, 0 deletions
diff --git a/clover/collector/grpc/__init__.py b/clover/collector/grpc/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/clover/collector/grpc/__init__.py diff --git a/clover/collector/grpc/build_proto.sh b/clover/collector/grpc/build_proto.sh new file mode 100755 index 0000000..44467ad --- /dev/null +++ b/clover/collector/grpc/build_proto.sh @@ -0,0 +1,11 @@ +#!/bin/bash +# +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +# + +python -m grpc_tools.protoc -I./ --python_out=. --grpc_python_out=. collector.proto diff --git a/clover/collector/grpc/collector.proto b/clover/collector/grpc/collector.proto new file mode 100644 index 0000000..fc8b636 --- /dev/null +++ b/clover/collector/grpc/collector.proto @@ -0,0 +1,45 @@ +// Copyright (c) Authors of Clover +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Apache License, Version 2.0 +// which accompanies this distribution, and is available at +// http://www.apache.org/licenses/LICENSE-2.0 + +syntax = "proto3"; + +package collector; + +// The controller service definition. +service Controller { + + rpc StopCollector (ConfigCollector) returns (CollectorReply) {} + rpc StartCollector (ConfigCollector) returns (CollectorReply) {} + rpc InitVisibility (ConfigCassandra) returns (CollectorReply) {} + rpc TruncateVisibility (Schemas) returns (CollectorReply) {} +} + +message ConfigCassandra { + string cassandra_hosts = 1; + int32 cassandra_port = 2; +} + +message ConfigCollector { + string t_port = 1; + string t_host = 2; + string m_port = 3; + string m_host = 4; + string c_port = 5; + string c_hosts = 6; + string sinterval = 7; +} + +message Schemas { + string schemas = 1; + string cassandra_hosts = 2; + int32 cassandra_port = 3; + +} + +message CollectorReply { + string message = 1; +} diff --git a/clover/collector/grpc/collector_client.py b/clover/collector/grpc/collector_client.py new file mode 100644 index 0000000..b9e9f67 --- /dev/null +++ b/clover/collector/grpc/collector_client.py @@ -0,0 +1,105 @@ +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +from __future__ import print_function +from kubernetes import client, config + +import grpc +import argparse +import pickle + +import collector_pb2 +import collector_pb2_grpc + +# This is a basic client script to test server GRPC messaging +# TODO improve interface overall + + +def run(args, grpc_port='50054'): + pod_ip = get_podip('clover-collector') + if pod_ip == '': + return "Can not find service: {}".format(args['service_name']) + collector_grpc = pod_ip + ':' + grpc_port + channel = grpc.insecure_channel(collector_grpc) + stub = collector_pb2_grpc.ControllerStub(channel) + + if args['cmd'] == 'init': + return init_visibility(stub) + elif args['cmd'] == 'start': + return start_collector(stub) + elif args['cmd'] == 'stop': + return stop_collector(stub) + elif args['cmd'] == 'clean': + return clean_visibility(stub) + else: + return "Invalid command: {}".format(args['cmd']) + + +def get_podip(pod_name): + ip = '' + if pod_name != '': + config.load_kube_config() + v1 = client.CoreV1Api() + ret = v1.list_pod_for_all_namespaces(watch=False) + for i in ret.items: + if i.metadata.name.lower().find(pod_name.lower()) != -1: + print("Pod IP: {}".format(i.status.pod_ip)) + ip = i.status.pod_ip + return str(ip) + return str(ip) + + +def init_visibility(stub): + try: + cassandra_hosts = pickle.dumps(['cassandra.default']) + response = stub.InitVisibility(collector_pb2.ConfigCassandra( + cassandra_hosts=cassandra_hosts, cassandra_port=9042)) + except Exception as e: + return e + return response.message + + +def clean_visibility(stub): + try: + cassandra_hosts = pickle.dumps(['cassandra.default']) + schemas = pickle.dumps(['spans', 'traces', 'metrics']) + response = stub.TruncateVisibility(collector_pb2.Schemas( + schemas=schemas, cassandra_hosts=cassandra_hosts, + cassandra_port=9042)) + except Exception as e: + return e + return response.message + + +def start_collector(stub): + try: + cassandra_hosts = pickle.dumps(['cassandra.default']) + response = stub.StartCollector(collector_pb2.ConfigCollector( + t_port='16686', t_host='jaeger-deployment.istio-system', + m_port='9090', m_host='prometheus.istio-system', + c_port='9042', c_hosts=cassandra_hosts, + sinterval='5')) + except Exception as e: + return e + return response.message + + +def stop_collector(stub): + try: + response = stub.StopCollector(collector_pb2.ConfigCollector()) + except Exception as e: + return e + return response.message + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument( + '--cmd', required=True, + help='Command to execute in collector') + args = parser.parse_args() + print(run(vars(args))) diff --git a/clover/collector/grpc/collector_pb2.py b/clover/collector/grpc/collector_pb2.py new file mode 100644 index 0000000..f67c880 --- /dev/null +++ b/clover/collector/grpc/collector_pb2.py @@ -0,0 +1,300 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: collector.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='collector.proto', + package='collector', + syntax='proto3', + serialized_pb=_b('\n\x0f\x63ollector.proto\x12\tcollector\"B\n\x0f\x43onfigCassandra\x12\x17\n\x0f\x63\x61ssandra_hosts\x18\x01 \x01(\t\x12\x16\n\x0e\x63\x61ssandra_port\x18\x02 \x01(\x05\"\x85\x01\n\x0f\x43onfigCollector\x12\x0e\n\x06t_port\x18\x01 \x01(\t\x12\x0e\n\x06t_host\x18\x02 \x01(\t\x12\x0e\n\x06m_port\x18\x03 \x01(\t\x12\x0e\n\x06m_host\x18\x04 \x01(\t\x12\x0e\n\x06\x63_port\x18\x05 \x01(\t\x12\x0f\n\x07\x63_hosts\x18\x06 \x01(\t\x12\x11\n\tsinterval\x18\x07 \x01(\t\"K\n\x07Schemas\x12\x0f\n\x07schemas\x18\x01 \x01(\t\x12\x17\n\x0f\x63\x61ssandra_hosts\x18\x02 \x01(\t\x12\x16\n\x0e\x63\x61ssandra_port\x18\x03 \x01(\x05\"!\n\x0e\x43ollectorReply\x12\x0f\n\x07message\x18\x01 \x01(\t2\xb3\x02\n\nController\x12H\n\rStopCollector\x12\x1a.collector.ConfigCollector\x1a\x19.collector.CollectorReply\"\x00\x12I\n\x0eStartCollector\x12\x1a.collector.ConfigCollector\x1a\x19.collector.CollectorReply\"\x00\x12I\n\x0eInitVisibility\x12\x1a.collector.ConfigCassandra\x1a\x19.collector.CollectorReply\"\x00\x12\x45\n\x12TruncateVisibility\x12\x12.collector.Schemas\x1a\x19.collector.CollectorReply\"\x00\x62\x06proto3') +) + + + + +_CONFIGCASSANDRA = _descriptor.Descriptor( + name='ConfigCassandra', + full_name='collector.ConfigCassandra', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='cassandra_hosts', full_name='collector.ConfigCassandra.cassandra_hosts', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='cassandra_port', full_name='collector.ConfigCassandra.cassandra_port', index=1, + number=2, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=30, + serialized_end=96, +) + + +_CONFIGCOLLECTOR = _descriptor.Descriptor( + name='ConfigCollector', + full_name='collector.ConfigCollector', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='t_port', full_name='collector.ConfigCollector.t_port', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='t_host', full_name='collector.ConfigCollector.t_host', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='m_port', full_name='collector.ConfigCollector.m_port', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='m_host', full_name='collector.ConfigCollector.m_host', index=3, + number=4, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='c_port', full_name='collector.ConfigCollector.c_port', index=4, + number=5, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='c_hosts', full_name='collector.ConfigCollector.c_hosts', index=5, + number=6, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='sinterval', full_name='collector.ConfigCollector.sinterval', index=6, + number=7, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=99, + serialized_end=232, +) + + +_SCHEMAS = _descriptor.Descriptor( + name='Schemas', + full_name='collector.Schemas', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='schemas', full_name='collector.Schemas.schemas', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='cassandra_hosts', full_name='collector.Schemas.cassandra_hosts', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='cassandra_port', full_name='collector.Schemas.cassandra_port', index=2, + number=3, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=234, + serialized_end=309, +) + + +_COLLECTORREPLY = _descriptor.Descriptor( + name='CollectorReply', + full_name='collector.CollectorReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='message', full_name='collector.CollectorReply.message', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=311, + serialized_end=344, +) + +DESCRIPTOR.message_types_by_name['ConfigCassandra'] = _CONFIGCASSANDRA +DESCRIPTOR.message_types_by_name['ConfigCollector'] = _CONFIGCOLLECTOR +DESCRIPTOR.message_types_by_name['Schemas'] = _SCHEMAS +DESCRIPTOR.message_types_by_name['CollectorReply'] = _COLLECTORREPLY +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +ConfigCassandra = _reflection.GeneratedProtocolMessageType('ConfigCassandra', (_message.Message,), dict( + DESCRIPTOR = _CONFIGCASSANDRA, + __module__ = 'collector_pb2' + # @@protoc_insertion_point(class_scope:collector.ConfigCassandra) + )) +_sym_db.RegisterMessage(ConfigCassandra) + +ConfigCollector = _reflection.GeneratedProtocolMessageType('ConfigCollector', (_message.Message,), dict( + DESCRIPTOR = _CONFIGCOLLECTOR, + __module__ = 'collector_pb2' + # @@protoc_insertion_point(class_scope:collector.ConfigCollector) + )) +_sym_db.RegisterMessage(ConfigCollector) + +Schemas = _reflection.GeneratedProtocolMessageType('Schemas', (_message.Message,), dict( + DESCRIPTOR = _SCHEMAS, + __module__ = 'collector_pb2' + # @@protoc_insertion_point(class_scope:collector.Schemas) + )) +_sym_db.RegisterMessage(Schemas) + +CollectorReply = _reflection.GeneratedProtocolMessageType('CollectorReply', (_message.Message,), dict( + DESCRIPTOR = _COLLECTORREPLY, + __module__ = 'collector_pb2' + # @@protoc_insertion_point(class_scope:collector.CollectorReply) + )) +_sym_db.RegisterMessage(CollectorReply) + + + +_CONTROLLER = _descriptor.ServiceDescriptor( + name='Controller', + full_name='collector.Controller', + file=DESCRIPTOR, + index=0, + options=None, + serialized_start=347, + serialized_end=654, + methods=[ + _descriptor.MethodDescriptor( + name='StopCollector', + full_name='collector.Controller.StopCollector', + index=0, + containing_service=None, + input_type=_CONFIGCOLLECTOR, + output_type=_COLLECTORREPLY, + options=None, + ), + _descriptor.MethodDescriptor( + name='StartCollector', + full_name='collector.Controller.StartCollector', + index=1, + containing_service=None, + input_type=_CONFIGCOLLECTOR, + output_type=_COLLECTORREPLY, + options=None, + ), + _descriptor.MethodDescriptor( + name='InitVisibility', + full_name='collector.Controller.InitVisibility', + index=2, + containing_service=None, + input_type=_CONFIGCASSANDRA, + output_type=_COLLECTORREPLY, + options=None, + ), + _descriptor.MethodDescriptor( + name='TruncateVisibility', + full_name='collector.Controller.TruncateVisibility', + index=3, + containing_service=None, + input_type=_SCHEMAS, + output_type=_COLLECTORREPLY, + options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_CONTROLLER) + +DESCRIPTOR.services_by_name['Controller'] = _CONTROLLER + +# @@protoc_insertion_point(module_scope) diff --git a/clover/collector/grpc/collector_pb2_grpc.py b/clover/collector/grpc/collector_pb2_grpc.py new file mode 100644 index 0000000..a7be73c --- /dev/null +++ b/clover/collector/grpc/collector_pb2_grpc.py @@ -0,0 +1,97 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +import collector_pb2 as collector__pb2 + + +class ControllerStub(object): + """The controller service definition. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.StopCollector = channel.unary_unary( + '/collector.Controller/StopCollector', + request_serializer=collector__pb2.ConfigCollector.SerializeToString, + response_deserializer=collector__pb2.CollectorReply.FromString, + ) + self.StartCollector = channel.unary_unary( + '/collector.Controller/StartCollector', + request_serializer=collector__pb2.ConfigCollector.SerializeToString, + response_deserializer=collector__pb2.CollectorReply.FromString, + ) + self.InitVisibility = channel.unary_unary( + '/collector.Controller/InitVisibility', + request_serializer=collector__pb2.ConfigCassandra.SerializeToString, + response_deserializer=collector__pb2.CollectorReply.FromString, + ) + self.TruncateVisibility = channel.unary_unary( + '/collector.Controller/TruncateVisibility', + request_serializer=collector__pb2.Schemas.SerializeToString, + response_deserializer=collector__pb2.CollectorReply.FromString, + ) + + +class ControllerServicer(object): + """The controller service definition. + """ + + def StopCollector(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def StartCollector(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def InitVisibility(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def TruncateVisibility(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_ControllerServicer_to_server(servicer, server): + rpc_method_handlers = { + 'StopCollector': grpc.unary_unary_rpc_method_handler( + servicer.StopCollector, + request_deserializer=collector__pb2.ConfigCollector.FromString, + response_serializer=collector__pb2.CollectorReply.SerializeToString, + ), + 'StartCollector': grpc.unary_unary_rpc_method_handler( + servicer.StartCollector, + request_deserializer=collector__pb2.ConfigCollector.FromString, + response_serializer=collector__pb2.CollectorReply.SerializeToString, + ), + 'InitVisibility': grpc.unary_unary_rpc_method_handler( + servicer.InitVisibility, + request_deserializer=collector__pb2.ConfigCassandra.FromString, + response_serializer=collector__pb2.CollectorReply.SerializeToString, + ), + 'TruncateVisibility': grpc.unary_unary_rpc_method_handler( + servicer.TruncateVisibility, + request_deserializer=collector__pb2.Schemas.FromString, + response_serializer=collector__pb2.CollectorReply.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'collector.Controller', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) diff --git a/clover/collector/grpc/collector_server.py b/clover/collector/grpc/collector_server.py new file mode 100644 index 0000000..c2eb221 --- /dev/null +++ b/clover/collector/grpc/collector_server.py @@ -0,0 +1,98 @@ +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + + +from concurrent import futures +from clover.collector.db.cassops import CassandraOps +import time +import sys +import grpc +import subprocess +import pickle +import logging +import collector_pb2 +import collector_pb2_grpc + + +_ONE_DAY_IN_SECONDS = 60 * 60 * 24 +GRPC_PORT = '[::]:50054' + + +class Controller(collector_pb2_grpc.ControllerServicer): + + def __init__(self, init_visibility): + logging.basicConfig(filename='collector_server.log', + level=logging.DEBUG) + self.collector = 0 + if init_visibility == 'set_schemas': + cassandra_hosts = pickle.dumps(['cassandra.default']) + self.InitVisibility(collector_pb2.ConfigCassandra( + cassandra_port=9042, cassandra_hosts=cassandra_hosts), "") + + def StopCollector(self, r, context): + try: + subprocess.Popen.kill(self.collector) + msg = "Stopped collector on pid: {}".format(self.collector.pid) + except Exception as e: + logging.debug(e) + msg = "Failed to stop collector" + return collector_pb2.CollectorReply(message=msg) + + def StartCollector(self, r, context): + try: + self.collector = subprocess.Popen( + ["python", "process/collect.py", + "-sinterval={}".format(r.sinterval), + "-c_port={}".format(r.c_port), + "-t_port={}".format(r.t_port), "-t_host={}".format(r.t_host), + "-m_port={}".format(r.m_port), "-m_host={}".format(r.m_host), + "-c_hosts={}".format(pickle.loads(r.c_hosts)), "&"], + shell=False) + msg = "Started collector on pid: {}".format(self.collector.pid) + except Exception as e: + logging.debug(e) + msg = e + return collector_pb2.CollectorReply(message=msg) + + def InitVisibility(self, r, context): + try: + cass = CassandraOps(pickle.loads(r.cassandra_hosts), + r.cassandra_port) + cass.init_visibility() + msg = "Added visibility schemas in cassandra" + except Exception as e: + logging.debug(e) + msg = "Failed to initialize cassandra" + return collector_pb2.CollectorReply(message=msg) + + def TruncateVisibility(self, r, context): + try: + cass = CassandraOps(pickle.loads(r.cassandra_hosts), + r.cassandra_port) + cass.truncate(pickle.loads(r.schemas)) + msg = "Truncated visibility tables" + except Exception as e: + logging.debug(e) + msg = "Failed to truncate visibility" + return collector_pb2.CollectorReply(message=msg) + + +def serve(init_visibility): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + collector_pb2_grpc.add_ControllerServicer_to_server( + Controller(init_visibility), server) + server.add_insecure_port(GRPC_PORT) + server.start() + try: + while True: + time.sleep(_ONE_DAY_IN_SECONDS) + except KeyboardInterrupt: + server.stop(0) + + +if __name__ == '__main__': + serve(sys.argv[1]) |