From dbece18d19c3977019c6727fcbe7a436031666fe Mon Sep 17 00:00:00 2001 From: Eddie Arrage Date: Wed, 9 May 2018 18:33:55 +0000 Subject: Initial commit for Clover Collector - Added a container named clover-collector using clover container as a base with build script - GRPC server to manage collector process - Cassandra DB client interface to initialize visibility keyspace - Init messaging adds table schemas for tracing - traces & spans - Adds table for monitoring - metrics - Does not implement Cassandra server but developed using public Cassandra docker container - Collector process in simple loop that periodically fetches traces and monitoring data and inserts to Cassandra - not optimized for batch retrieval yet for monitoring - CLI interface added to collector process and used by GRPC server for configuration - Simple GRPC client script to test GRPC server and start/stop of collector process - Collector process can be configured with access for tracing, monitoring and Cassandra - Added a return value in monitoring query method - Added ability to truncate tracing, metrics and spans tables in cql - Added cql prepared statements and batch insert for metrics and spans - Align cql connection to cql deployment within k8s - Fix issue with cql host list using ast and collect process args with background argument - Added redis interface to accept service/metric list externally for monitoring (will work in conjunction with clover-controller) - Use k8s DNS names and default ports for monitoring, tracing and cassandra - Added yaml manifest renderer/template for collector Change-Id: I3e4353e28844c4ce9c185ff4638012b66c7fff67 Signed-off-by: Eddie Arrage --- clover/collector/__init__.py | 0 clover/collector/build.sh | 16 ++ clover/collector/db/__init__.py | 0 clover/collector/db/cassops.py | 144 +++++++++++++ clover/collector/db/redisops.py | 59 ++++++ clover/collector/docker/Dockerfile | 30 +++ clover/collector/grpc/__init__.py | 0 clover/collector/grpc/build_proto.sh | 11 + clover/collector/grpc/collector.proto | 45 +++++ clover/collector/grpc/collector_client.py | 105 ++++++++++ clover/collector/grpc/collector_pb2.py | 300 ++++++++++++++++++++++++++++ clover/collector/grpc/collector_pb2_grpc.py | 97 +++++++++ clover/collector/grpc/collector_server.py | 98 +++++++++ clover/collector/process/__init__.py | 0 clover/collector/process/collect.py | 162 +++++++++++++++ clover/collector/process/grpc_process.sh | 11 + clover/collector/yaml/manifest.template | 43 ++++ clover/collector/yaml/render_yaml.py | 73 +++++++ clover/monitoring/monitoring.py | 5 +- 19 files changed, 1197 insertions(+), 2 deletions(-) create mode 100644 clover/collector/__init__.py create mode 100755 clover/collector/build.sh create mode 100644 clover/collector/db/__init__.py create mode 100644 clover/collector/db/cassops.py create mode 100644 clover/collector/db/redisops.py create mode 100644 clover/collector/docker/Dockerfile create mode 100644 clover/collector/grpc/__init__.py create mode 100755 clover/collector/grpc/build_proto.sh create mode 100644 clover/collector/grpc/collector.proto create mode 100644 clover/collector/grpc/collector_client.py create mode 100644 clover/collector/grpc/collector_pb2.py create mode 100644 clover/collector/grpc/collector_pb2_grpc.py create mode 100644 clover/collector/grpc/collector_server.py create mode 100644 clover/collector/process/__init__.py create mode 100644 clover/collector/process/collect.py create mode 100755 clover/collector/process/grpc_process.sh create mode 100644 clover/collector/yaml/manifest.template create mode 100644 clover/collector/yaml/render_yaml.py diff --git a/clover/collector/__init__.py b/clover/collector/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/clover/collector/build.sh b/clover/collector/build.sh new file mode 100755 index 0000000..f305a02 --- /dev/null +++ b/clover/collector/build.sh @@ -0,0 +1,16 @@ +#!/bin/bash +# +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +# + +IMAGE_PATH=${IMAGE_PATH:-"localhost:5000"} +IMAGE_NAME=${IMAGE_NAME:-"clover-collector"} + +docker build -f docker/Dockerfile -t $IMAGE_NAME . +docker tag $IMAGE_NAME $IMAGE_PATH/$IMAGE_NAME +docker push $IMAGE_PATH/$IMAGE_NAME diff --git a/clover/collector/db/__init__.py b/clover/collector/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/clover/collector/db/cassops.py b/clover/collector/db/cassops.py new file mode 100644 index 0000000..6553cff --- /dev/null +++ b/clover/collector/db/cassops.py @@ -0,0 +1,144 @@ +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +from cassandra.cluster import Cluster +from cassandra.query import BatchStatement +import logging + +CASSANDRA_HOSTS = ['cassandra.default'] + + +class CassandraOps: + + def __init__(self, hosts, port=9042, keyspace='visibility'): + logging.basicConfig(filename='cassops.log', + level=logging.DEBUG) + cluster = Cluster(hosts, port=port) + self.session = cluster.connect() + self.keyspace = keyspace + + def truncate(self, tables=['traces', 'metrics', 'spans']): + self.session.set_keyspace(self.keyspace) + try: + for table in tables: + self.session.execute(""" + TRUNCATE %s + """ % table) + except Exception as e: + logging.debug(e) + + def init_visibility(self): + try: + self.session.execute(""" + CREATE KEYSPACE %s + WITH replication = { 'class': 'SimpleStrategy', + 'replication_factor': '1' } + """ % self.keyspace) + except Exception as e: + logging.debug(e) + + self.session.set_keyspace(self.keyspace) + + try: + self.session.execute(""" + CREATE TABLE IF NOT EXISTS traces ( + traceid text, + processes list, + PRIMARY KEY (traceid) + ) + """) + + self.session.execute(""" + CREATE TABLE IF NOT EXISTS spans ( + spanid text, + traceid text, + duration int, + start_time int, + processid text, + operation_name text, + node_id text, + http_url text, + upstream_cluster text, + PRIMARY KEY (spanid, traceid) + ) + """) + + self.session.execute(""" + CREATE TABLE IF NOT EXISTS metrics ( + m_name text, + m_value text, + m_time text, + service text, + monitor_time timestamp, + PRIMARY KEY (m_name, monitor_time) + ) + """) + except Exception as e: + logging.debug(e) + + def set_prepared(self): + self.session.set_keyspace(self.keyspace) + self.insert_tracing_stmt = self.session.prepare( + """ + INSERT INTO spans (spanid, traceid, duration, operation_name, + node_id, http_url, upstream_cluster) + VALUES (?, ?, ?, ?, ?, ?, ?) + """ + ) + self.insert_metric_stmt = self.session.prepare( + """ + INSERT INTO metrics + (m_name, m_value, m_time, service, monitor_time) + VALUES (?, ?, ?, ?, toTimestamp(now())) + """ + ) + + def set_batch(self): + self.batch = BatchStatement() + + def execute_batch(self): + self.session.execute(self.batch) + + def insert_tracing(self, table, traceid, s, tags): + self.session.set_keyspace(self.keyspace) + if 'upstream_cluster' not in tags: + logging.debug('NO UPSTREAM_CLUSTER KEY') + tags['upstream_cluster'] = 'none' + try: + self.batch.add(self.insert_tracing_stmt, + (s['spanID'], traceid, s['duration'], + s['operationName'], tags['node_id'], + tags['http.url'], tags['upstream_cluster'])) + except Exception as e: + logging.debug('{} {} {} {} {} {} {}'.format(s['spanID'], traceid, + s['duration'], s['operationName'], tags['node_id'], + tags['http.url'], tags['upstream_cluster'])) + logging.debug(e) + + def insert_trace(self, traceid, processes): + self.session.set_keyspace(self.keyspace) + self.session.execute( + """ + INSERT INTO traces (traceid, processes) + VALUES (%s, %s) + """, + (traceid, processes) + ) + + def insert_metric(self, m_name, m_value, m_time, service): + self.session.set_keyspace(self.keyspace) + self.batch.add(self.insert_metric_stmt, + (m_name, m_value, m_time, service)) + + +def main(): + cass = CassandraOps(CASSANDRA_HOSTS) + cass.init_visibility() + + +if __name__ == '__main__': + main() diff --git a/clover/collector/db/redisops.py b/clover/collector/db/redisops.py new file mode 100644 index 0000000..e80c417 --- /dev/null +++ b/clover/collector/db/redisops.py @@ -0,0 +1,59 @@ +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +import redis +import logging + +REDIS_HOST = 'redis' +# REDIS_HOST = '10.244.0.85' + + +class RedisOps: + + def __init__(self, host=REDIS_HOST): + logging.basicConfig(filename='redisops.log', + level=logging.DEBUG) + try: + self.r = redis.StrictRedis(host=host, port=6379, db=0) + except Exception as e: + logging.debug(e) + + def init_services(self, skey='visibility_services'): + service_names = ['http_lb', 'proxy_access_control'] + for s in service_names: + self.r.sadd(skey, s) + + def init_metrics(self, pkey='metric_prefixes', skey='metric_suffixes'): + metric_prefixes = ['envoy_cluster_out_', 'envoy_cluster_in_'] + metric_suffixes = [ + '_default_svc_cluster_local_http_internal_upstream_rq_2xx', + '_default_svc_cluster_local_http_upstream_cx_active'] + for p in metric_prefixes: + self.r.sadd(pkey, p) + for s in metric_suffixes: + self.r.sadd(skey, s) + + def get_services(self, skey='visibility_services'): + services = self.r.smembers(skey) + return services + + def get_metrics(self, pkey='metric_prefixes', skey='metric_suffixes'): + prefixes = self.r.smembers(pkey) + suffixes = self.r.smembers(skey) + return prefixes, suffixes + + +def main(): + r = RedisOps() + r.init_services() + r.init_metrics() + r.get_services() + r.get_metrics() + + +if __name__ == '__main__': + main() diff --git a/clover/collector/docker/Dockerfile b/clover/collector/docker/Dockerfile new file mode 100644 index 0000000..1714420 --- /dev/null +++ b/clover/collector/docker/Dockerfile @@ -0,0 +1,30 @@ +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +FROM opnfv/clover + +ENV REPOS_DIR="/home/opnfv/repos" + +# Clover repo +ENV CLOVER_REPO_DIR="${REPOS_DIR}/clover" + +# Install required python packages +RUN python -m pip install cassandra-driver redis + +# Set work directory +WORKDIR ${CLOVER_REPO_DIR} + +COPY /process clover/collector/process +COPY /grpc clover/collector/grpc +COPY /db clover/collector/db +COPY __init__.py clover/collector/__init__.py + +RUN pip install . + +WORKDIR "${CLOVER_REPO_DIR}/clover/collector" + +CMD ./process/grpc_process.sh no_schema_init diff --git a/clover/collector/grpc/__init__.py b/clover/collector/grpc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/clover/collector/grpc/build_proto.sh b/clover/collector/grpc/build_proto.sh new file mode 100755 index 0000000..44467ad --- /dev/null +++ b/clover/collector/grpc/build_proto.sh @@ -0,0 +1,11 @@ +#!/bin/bash +# +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +# + +python -m grpc_tools.protoc -I./ --python_out=. --grpc_python_out=. collector.proto diff --git a/clover/collector/grpc/collector.proto b/clover/collector/grpc/collector.proto new file mode 100644 index 0000000..fc8b636 --- /dev/null +++ b/clover/collector/grpc/collector.proto @@ -0,0 +1,45 @@ +// Copyright (c) Authors of Clover +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Apache License, Version 2.0 +// which accompanies this distribution, and is available at +// http://www.apache.org/licenses/LICENSE-2.0 + +syntax = "proto3"; + +package collector; + +// The controller service definition. +service Controller { + + rpc StopCollector (ConfigCollector) returns (CollectorReply) {} + rpc StartCollector (ConfigCollector) returns (CollectorReply) {} + rpc InitVisibility (ConfigCassandra) returns (CollectorReply) {} + rpc TruncateVisibility (Schemas) returns (CollectorReply) {} +} + +message ConfigCassandra { + string cassandra_hosts = 1; + int32 cassandra_port = 2; +} + +message ConfigCollector { + string t_port = 1; + string t_host = 2; + string m_port = 3; + string m_host = 4; + string c_port = 5; + string c_hosts = 6; + string sinterval = 7; +} + +message Schemas { + string schemas = 1; + string cassandra_hosts = 2; + int32 cassandra_port = 3; + +} + +message CollectorReply { + string message = 1; +} diff --git a/clover/collector/grpc/collector_client.py b/clover/collector/grpc/collector_client.py new file mode 100644 index 0000000..b9e9f67 --- /dev/null +++ b/clover/collector/grpc/collector_client.py @@ -0,0 +1,105 @@ +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +from __future__ import print_function +from kubernetes import client, config + +import grpc +import argparse +import pickle + +import collector_pb2 +import collector_pb2_grpc + +# This is a basic client script to test server GRPC messaging +# TODO improve interface overall + + +def run(args, grpc_port='50054'): + pod_ip = get_podip('clover-collector') + if pod_ip == '': + return "Can not find service: {}".format(args['service_name']) + collector_grpc = pod_ip + ':' + grpc_port + channel = grpc.insecure_channel(collector_grpc) + stub = collector_pb2_grpc.ControllerStub(channel) + + if args['cmd'] == 'init': + return init_visibility(stub) + elif args['cmd'] == 'start': + return start_collector(stub) + elif args['cmd'] == 'stop': + return stop_collector(stub) + elif args['cmd'] == 'clean': + return clean_visibility(stub) + else: + return "Invalid command: {}".format(args['cmd']) + + +def get_podip(pod_name): + ip = '' + if pod_name != '': + config.load_kube_config() + v1 = client.CoreV1Api() + ret = v1.list_pod_for_all_namespaces(watch=False) + for i in ret.items: + if i.metadata.name.lower().find(pod_name.lower()) != -1: + print("Pod IP: {}".format(i.status.pod_ip)) + ip = i.status.pod_ip + return str(ip) + return str(ip) + + +def init_visibility(stub): + try: + cassandra_hosts = pickle.dumps(['cassandra.default']) + response = stub.InitVisibility(collector_pb2.ConfigCassandra( + cassandra_hosts=cassandra_hosts, cassandra_port=9042)) + except Exception as e: + return e + return response.message + + +def clean_visibility(stub): + try: + cassandra_hosts = pickle.dumps(['cassandra.default']) + schemas = pickle.dumps(['spans', 'traces', 'metrics']) + response = stub.TruncateVisibility(collector_pb2.Schemas( + schemas=schemas, cassandra_hosts=cassandra_hosts, + cassandra_port=9042)) + except Exception as e: + return e + return response.message + + +def start_collector(stub): + try: + cassandra_hosts = pickle.dumps(['cassandra.default']) + response = stub.StartCollector(collector_pb2.ConfigCollector( + t_port='16686', t_host='jaeger-deployment.istio-system', + m_port='9090', m_host='prometheus.istio-system', + c_port='9042', c_hosts=cassandra_hosts, + sinterval='5')) + except Exception as e: + return e + return response.message + + +def stop_collector(stub): + try: + response = stub.StopCollector(collector_pb2.ConfigCollector()) + except Exception as e: + return e + return response.message + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument( + '--cmd', required=True, + help='Command to execute in collector') + args = parser.parse_args() + print(run(vars(args))) diff --git a/clover/collector/grpc/collector_pb2.py b/clover/collector/grpc/collector_pb2.py new file mode 100644 index 0000000..f67c880 --- /dev/null +++ b/clover/collector/grpc/collector_pb2.py @@ -0,0 +1,300 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: collector.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='collector.proto', + package='collector', + syntax='proto3', + serialized_pb=_b('\n\x0f\x63ollector.proto\x12\tcollector\"B\n\x0f\x43onfigCassandra\x12\x17\n\x0f\x63\x61ssandra_hosts\x18\x01 \x01(\t\x12\x16\n\x0e\x63\x61ssandra_port\x18\x02 \x01(\x05\"\x85\x01\n\x0f\x43onfigCollector\x12\x0e\n\x06t_port\x18\x01 \x01(\t\x12\x0e\n\x06t_host\x18\x02 \x01(\t\x12\x0e\n\x06m_port\x18\x03 \x01(\t\x12\x0e\n\x06m_host\x18\x04 \x01(\t\x12\x0e\n\x06\x63_port\x18\x05 \x01(\t\x12\x0f\n\x07\x63_hosts\x18\x06 \x01(\t\x12\x11\n\tsinterval\x18\x07 \x01(\t\"K\n\x07Schemas\x12\x0f\n\x07schemas\x18\x01 \x01(\t\x12\x17\n\x0f\x63\x61ssandra_hosts\x18\x02 \x01(\t\x12\x16\n\x0e\x63\x61ssandra_port\x18\x03 \x01(\x05\"!\n\x0e\x43ollectorReply\x12\x0f\n\x07message\x18\x01 \x01(\t2\xb3\x02\n\nController\x12H\n\rStopCollector\x12\x1a.collector.ConfigCollector\x1a\x19.collector.CollectorReply\"\x00\x12I\n\x0eStartCollector\x12\x1a.collector.ConfigCollector\x1a\x19.collector.CollectorReply\"\x00\x12I\n\x0eInitVisibility\x12\x1a.collector.ConfigCassandra\x1a\x19.collector.CollectorReply\"\x00\x12\x45\n\x12TruncateVisibility\x12\x12.collector.Schemas\x1a\x19.collector.CollectorReply\"\x00\x62\x06proto3') +) + + + + +_CONFIGCASSANDRA = _descriptor.Descriptor( + name='ConfigCassandra', + full_name='collector.ConfigCassandra', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='cassandra_hosts', full_name='collector.ConfigCassandra.cassandra_hosts', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='cassandra_port', full_name='collector.ConfigCassandra.cassandra_port', index=1, + number=2, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=30, + serialized_end=96, +) + + +_CONFIGCOLLECTOR = _descriptor.Descriptor( + name='ConfigCollector', + full_name='collector.ConfigCollector', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='t_port', full_name='collector.ConfigCollector.t_port', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='t_host', full_name='collector.ConfigCollector.t_host', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='m_port', full_name='collector.ConfigCollector.m_port', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='m_host', full_name='collector.ConfigCollector.m_host', index=3, + number=4, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='c_port', full_name='collector.ConfigCollector.c_port', index=4, + number=5, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='c_hosts', full_name='collector.ConfigCollector.c_hosts', index=5, + number=6, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='sinterval', full_name='collector.ConfigCollector.sinterval', index=6, + number=7, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=99, + serialized_end=232, +) + + +_SCHEMAS = _descriptor.Descriptor( + name='Schemas', + full_name='collector.Schemas', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='schemas', full_name='collector.Schemas.schemas', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='cassandra_hosts', full_name='collector.Schemas.cassandra_hosts', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='cassandra_port', full_name='collector.Schemas.cassandra_port', index=2, + number=3, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=234, + serialized_end=309, +) + + +_COLLECTORREPLY = _descriptor.Descriptor( + name='CollectorReply', + full_name='collector.CollectorReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='message', full_name='collector.CollectorReply.message', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=311, + serialized_end=344, +) + +DESCRIPTOR.message_types_by_name['ConfigCassandra'] = _CONFIGCASSANDRA +DESCRIPTOR.message_types_by_name['ConfigCollector'] = _CONFIGCOLLECTOR +DESCRIPTOR.message_types_by_name['Schemas'] = _SCHEMAS +DESCRIPTOR.message_types_by_name['CollectorReply'] = _COLLECTORREPLY +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +ConfigCassandra = _reflection.GeneratedProtocolMessageType('ConfigCassandra', (_message.Message,), dict( + DESCRIPTOR = _CONFIGCASSANDRA, + __module__ = 'collector_pb2' + # @@protoc_insertion_point(class_scope:collector.ConfigCassandra) + )) +_sym_db.RegisterMessage(ConfigCassandra) + +ConfigCollector = _reflection.GeneratedProtocolMessageType('ConfigCollector', (_message.Message,), dict( + DESCRIPTOR = _CONFIGCOLLECTOR, + __module__ = 'collector_pb2' + # @@protoc_insertion_point(class_scope:collector.ConfigCollector) + )) +_sym_db.RegisterMessage(ConfigCollector) + +Schemas = _reflection.GeneratedProtocolMessageType('Schemas', (_message.Message,), dict( + DESCRIPTOR = _SCHEMAS, + __module__ = 'collector_pb2' + # @@protoc_insertion_point(class_scope:collector.Schemas) + )) +_sym_db.RegisterMessage(Schemas) + +CollectorReply = _reflection.GeneratedProtocolMessageType('CollectorReply', (_message.Message,), dict( + DESCRIPTOR = _COLLECTORREPLY, + __module__ = 'collector_pb2' + # @@protoc_insertion_point(class_scope:collector.CollectorReply) + )) +_sym_db.RegisterMessage(CollectorReply) + + + +_CONTROLLER = _descriptor.ServiceDescriptor( + name='Controller', + full_name='collector.Controller', + file=DESCRIPTOR, + index=0, + options=None, + serialized_start=347, + serialized_end=654, + methods=[ + _descriptor.MethodDescriptor( + name='StopCollector', + full_name='collector.Controller.StopCollector', + index=0, + containing_service=None, + input_type=_CONFIGCOLLECTOR, + output_type=_COLLECTORREPLY, + options=None, + ), + _descriptor.MethodDescriptor( + name='StartCollector', + full_name='collector.Controller.StartCollector', + index=1, + containing_service=None, + input_type=_CONFIGCOLLECTOR, + output_type=_COLLECTORREPLY, + options=None, + ), + _descriptor.MethodDescriptor( + name='InitVisibility', + full_name='collector.Controller.InitVisibility', + index=2, + containing_service=None, + input_type=_CONFIGCASSANDRA, + output_type=_COLLECTORREPLY, + options=None, + ), + _descriptor.MethodDescriptor( + name='TruncateVisibility', + full_name='collector.Controller.TruncateVisibility', + index=3, + containing_service=None, + input_type=_SCHEMAS, + output_type=_COLLECTORREPLY, + options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_CONTROLLER) + +DESCRIPTOR.services_by_name['Controller'] = _CONTROLLER + +# @@protoc_insertion_point(module_scope) diff --git a/clover/collector/grpc/collector_pb2_grpc.py b/clover/collector/grpc/collector_pb2_grpc.py new file mode 100644 index 0000000..a7be73c --- /dev/null +++ b/clover/collector/grpc/collector_pb2_grpc.py @@ -0,0 +1,97 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +import collector_pb2 as collector__pb2 + + +class ControllerStub(object): + """The controller service definition. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.StopCollector = channel.unary_unary( + '/collector.Controller/StopCollector', + request_serializer=collector__pb2.ConfigCollector.SerializeToString, + response_deserializer=collector__pb2.CollectorReply.FromString, + ) + self.StartCollector = channel.unary_unary( + '/collector.Controller/StartCollector', + request_serializer=collector__pb2.ConfigCollector.SerializeToString, + response_deserializer=collector__pb2.CollectorReply.FromString, + ) + self.InitVisibility = channel.unary_unary( + '/collector.Controller/InitVisibility', + request_serializer=collector__pb2.ConfigCassandra.SerializeToString, + response_deserializer=collector__pb2.CollectorReply.FromString, + ) + self.TruncateVisibility = channel.unary_unary( + '/collector.Controller/TruncateVisibility', + request_serializer=collector__pb2.Schemas.SerializeToString, + response_deserializer=collector__pb2.CollectorReply.FromString, + ) + + +class ControllerServicer(object): + """The controller service definition. + """ + + def StopCollector(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def StartCollector(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def InitVisibility(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def TruncateVisibility(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_ControllerServicer_to_server(servicer, server): + rpc_method_handlers = { + 'StopCollector': grpc.unary_unary_rpc_method_handler( + servicer.StopCollector, + request_deserializer=collector__pb2.ConfigCollector.FromString, + response_serializer=collector__pb2.CollectorReply.SerializeToString, + ), + 'StartCollector': grpc.unary_unary_rpc_method_handler( + servicer.StartCollector, + request_deserializer=collector__pb2.ConfigCollector.FromString, + response_serializer=collector__pb2.CollectorReply.SerializeToString, + ), + 'InitVisibility': grpc.unary_unary_rpc_method_handler( + servicer.InitVisibility, + request_deserializer=collector__pb2.ConfigCassandra.FromString, + response_serializer=collector__pb2.CollectorReply.SerializeToString, + ), + 'TruncateVisibility': grpc.unary_unary_rpc_method_handler( + servicer.TruncateVisibility, + request_deserializer=collector__pb2.Schemas.FromString, + response_serializer=collector__pb2.CollectorReply.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'collector.Controller', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) diff --git a/clover/collector/grpc/collector_server.py b/clover/collector/grpc/collector_server.py new file mode 100644 index 0000000..c2eb221 --- /dev/null +++ b/clover/collector/grpc/collector_server.py @@ -0,0 +1,98 @@ +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + + +from concurrent import futures +from clover.collector.db.cassops import CassandraOps +import time +import sys +import grpc +import subprocess +import pickle +import logging +import collector_pb2 +import collector_pb2_grpc + + +_ONE_DAY_IN_SECONDS = 60 * 60 * 24 +GRPC_PORT = '[::]:50054' + + +class Controller(collector_pb2_grpc.ControllerServicer): + + def __init__(self, init_visibility): + logging.basicConfig(filename='collector_server.log', + level=logging.DEBUG) + self.collector = 0 + if init_visibility == 'set_schemas': + cassandra_hosts = pickle.dumps(['cassandra.default']) + self.InitVisibility(collector_pb2.ConfigCassandra( + cassandra_port=9042, cassandra_hosts=cassandra_hosts), "") + + def StopCollector(self, r, context): + try: + subprocess.Popen.kill(self.collector) + msg = "Stopped collector on pid: {}".format(self.collector.pid) + except Exception as e: + logging.debug(e) + msg = "Failed to stop collector" + return collector_pb2.CollectorReply(message=msg) + + def StartCollector(self, r, context): + try: + self.collector = subprocess.Popen( + ["python", "process/collect.py", + "-sinterval={}".format(r.sinterval), + "-c_port={}".format(r.c_port), + "-t_port={}".format(r.t_port), "-t_host={}".format(r.t_host), + "-m_port={}".format(r.m_port), "-m_host={}".format(r.m_host), + "-c_hosts={}".format(pickle.loads(r.c_hosts)), "&"], + shell=False) + msg = "Started collector on pid: {}".format(self.collector.pid) + except Exception as e: + logging.debug(e) + msg = e + return collector_pb2.CollectorReply(message=msg) + + def InitVisibility(self, r, context): + try: + cass = CassandraOps(pickle.loads(r.cassandra_hosts), + r.cassandra_port) + cass.init_visibility() + msg = "Added visibility schemas in cassandra" + except Exception as e: + logging.debug(e) + msg = "Failed to initialize cassandra" + return collector_pb2.CollectorReply(message=msg) + + def TruncateVisibility(self, r, context): + try: + cass = CassandraOps(pickle.loads(r.cassandra_hosts), + r.cassandra_port) + cass.truncate(pickle.loads(r.schemas)) + msg = "Truncated visibility tables" + except Exception as e: + logging.debug(e) + msg = "Failed to truncate visibility" + return collector_pb2.CollectorReply(message=msg) + + +def serve(init_visibility): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + collector_pb2_grpc.add_ControllerServicer_to_server( + Controller(init_visibility), server) + server.add_insecure_port(GRPC_PORT) + server.start() + try: + while True: + time.sleep(_ONE_DAY_IN_SECONDS) + except KeyboardInterrupt: + server.stop(0) + + +if __name__ == '__main__': + serve(sys.argv[1]) diff --git a/clover/collector/process/__init__.py b/clover/collector/process/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/clover/collector/process/collect.py b/clover/collector/process/collect.py new file mode 100644 index 0000000..d8beb49 --- /dev/null +++ b/clover/collector/process/collect.py @@ -0,0 +1,162 @@ +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +from clover.tracing.tracing import Tracing +from clover.monitoring.monitoring import Monitoring +from clover.collector.db.cassops import CassandraOps +from clover.collector.db.redisops import RedisOps + +# import pprint +import time +import argparse +import logging +import ast + +TRACING_SERVICES = ['istio-ingress'] +TRACING_PORT = "16686" +MONITORING_PORT = "9090" +CASSANDRA_PORT = 9042 # Provide as integer +MONITORING_HOST = "prometheus.istio-system" +TRACING_HOST = "jaeger-deployment.istio-system" +CASSANDRA_HOSTS = ['cassandra.default'] + + +class Collector: + + def __init__(self, t_port, t_host, m_port, m_host, c_port, c_hosts): + logging.basicConfig(filename='collector.log', level=logging.DEBUG) + try: + self.t = Tracing(t_host, t_port, '', False) + monitoring_url = "http://{}:{}".format(m_host, m_port) + self.m = Monitoring(monitoring_url) + self.c = CassandraOps(c_hosts, int(c_port)) + self.c.set_prepared() + self.r = RedisOps() + except Exception as e: + logging.debug(e) + + # Toplevel tracing retrieval and batch insert + def get_tracing(self, services, time_back=20): + self.c.set_batch() + for service in services: + traces = self.t.getTraces(service, time_back) + try: + self.set_tracing(traces) + except Exception as e: + logging.debug(e) + self.c.execute_batch() + + # Insert to cassandra visibility traces and spans tables + def set_tracing(self, trace): + for traces in trace['data']: + for spans in traces['spans']: + span = {} + span['spanID'] = spans['spanID'] + span['duration'] = spans['duration'] + span['startTime'] = spans['startTime'] + span['operationName'] = spans['operationName'] + tag = {} + for tags in spans['tags']: + tag[tags['key']] = tags['value'] + self.c.insert_tracing('spans', traces['traceID'], + span, tag) + process_list = [] + for p in traces['processes']: + process_list.append(p) + service_names = [] + for pname in process_list: + service_names.append(traces['processes'][pname]['serviceName']) + self.c.insert_trace(traces['traceID'], service_names) + + # Insert to cassandra visibility metrics table + def get_monitoring(self): + + # Fetch collector service/metric lists from redis + service_names = self.r.get_services() + metric_prefixes, metric_suffixes = self.r.get_metrics() + + self.c.set_batch() + for sname in service_names: + for prefix in metric_prefixes: + for suffix in metric_suffixes: + try: + metric_name = prefix + sname + suffix + query_params = { + "type": "instant", + "query": metric_name + } + data = self.m.query(query_params) + m_value = data['data']['result'][0]['value'][1] + m_time = data['data']['result'][0]['value'][0] + mn = data['data']['result'][0]['metric']['__name__'] + self.c.insert_metric(mn, m_value, str(m_time), sname) + except Exception as e: + logging.debug(e) + self.c.execute_batch() + + # TODO add batch retrieval for monitoring metrics + # query_range_param = { + # "type": "range", + # "query": "tbd", + # "start": "60m", + # "end": "5m", + # "step": "30s" + # } + # data = self.m.query(query_range_param) + # pp = pprint.PrettyPrinter(indent=2) + # pp.pprint(data) + + +def main(args): + if isinstance(args['c_hosts'], basestring): + ch = ast.literal_eval(args['c_hosts']) + else: + ch = args['c_hosts'] + + c = Collector(args['t_port'], args['t_host'], args['m_port'], + args['m_host'], args['c_port'], ch) + + # Collector loop + loop = True + while loop: + try: + c.get_tracing(args['t_services']) + c.get_monitoring() + time.sleep(int(args['sinterval'])) + except KeyboardInterrupt: + loop = False + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument( + '-sinterval', default=5, + help='Sample interval for collector loop') + parser.add_argument( + '-t_port', default=TRACING_PORT, + help='Port to access Jaeger tracing') + parser.add_argument( + '-m_port', default=MONITORING_PORT, + help='Port to access Prometheus monitoring') + parser.add_argument( + '-t_host', default=TRACING_HOST, + help='Host to access Jaeger tracing') + parser.add_argument( + '-m_host', default=MONITORING_HOST, + help='Host to access Prometheus monitoring') + parser.add_argument( + '-c_hosts', default=CASSANDRA_HOSTS, + help='Host(s) to access Cassandra cluster') + parser.add_argument( + '-c_port', default=CASSANDRA_PORT, + help='Port to access Cassandra cluster') + parser.add_argument( + '-t_services', default=TRACING_SERVICES, + help='Collect services on this list of services') + + args, unknown = parser.parse_known_args() + print(main(vars(args))) diff --git a/clover/collector/process/grpc_process.sh b/clover/collector/process/grpc_process.sh new file mode 100755 index 0000000..30e0171 --- /dev/null +++ b/clover/collector/process/grpc_process.sh @@ -0,0 +1,11 @@ +#!/bin/bash +# +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +# + +python grpc/collector_server.py test1 diff --git a/clover/collector/yaml/manifest.template b/clover/collector/yaml/manifest.template new file mode 100644 index 0000000..c7aa3e7 --- /dev/null +++ b/clover/collector/yaml/manifest.template @@ -0,0 +1,43 @@ +--- +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: {{ deploy_name }} + labels: + app: {{ deploy_name }} +spec: + template: + metadata: + labels: + app: {{ deploy_name }} + spec: + containers: + - name: {{ deploy_name }} + image: {{ image_path }}/{{ image_name }}:{{ image_tag }} + ports: + - containerPort: {{ grpc_port }} + - containerPort: {{ redis_port }} + - containerPort: {{ monitor_port }} + - containerPort: {{ trace_port }} + - containerPort: {{ cass_port }} +--- +apiVersion: v1 +kind: Service +metadata: + name: {{ deploy_name }} + labels: + app: {{ deploy_name }} +spec: + ports: + - port: {{ grpc_port }} + name: grpc + - port: {{ redis_port }} + name: redis + - port: {{ trace_port }} + name: jaeger-deployment + - port: {{ monitor_port }} + name: prometheus + - port: {{ cass_port }} + name: cassandra + selector: + app: {{ deploy_name }} diff --git a/clover/collector/yaml/render_yaml.py b/clover/collector/yaml/render_yaml.py new file mode 100644 index 0000000..c1d8be7 --- /dev/null +++ b/clover/collector/yaml/render_yaml.py @@ -0,0 +1,73 @@ +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +import argparse + +from jinja2 import Template + + +def render_yaml(args): + template_file = 'manifest.template' + out_file = args['deploy_name'] + '.yaml' + + try: + with open(template_file) as f: + tmpl = Template(f.read()) + output = tmpl.render( + image_path=args['image_path'], + image_name=args['image_name'], + image_tag=args['image_tag'], + deploy_name=args['deploy_name'], + grpc_port=args['grpc_port'], + monitor_port=args['monitor_port'], + redis_port=args['redis_port'], + cass_port=args['cass_port'], + trace_port=args['trace_port'] + ) + with open(out_file, "wb") as fh: + fh.write(output) + return "Generated manifest for {}".format(args['deploy_name']) + except Exception as e: + print(e) + return "Unable to generate manifest for {}".format( + args['deploy_name']) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument( + '--image_name', default='clover-collector', + help='The image name to use') + parser.add_argument( + # '--image_path', default='opnfv', + '--image_path', default='localhost:5000', + help='The path to the image to use') + parser.add_argument( + # '--image_tag', default='opnfv-6.0.0', + '--image_tag', default='latest', + help='The image tag to use') + parser.add_argument( + '--deploy_name', default='clover-collector', + help='The k8s deploy name to use') + parser.add_argument( + '--redis_port', default='6379', + help='The redis port to connect for management') + parser.add_argument( + '--monitor_port', default='9090', + help='The Prometheus monitoring port') + parser.add_argument( + '--grpc_port', default='50054', + help='The GRPC server port for collector management') + parser.add_argument( + '--trace_port', default='16686', + help='The Jaeger tracing port') + parser.add_argument( + '--cass_port', default='9042', + help='The Cassandra port') + + args = parser.parse_args() + print(render_yaml(vars(args))) diff --git a/clover/monitoring/monitoring.py b/clover/monitoring/monitoring.py index 9726fd1..ec97e82 100644 --- a/clover/monitoring/monitoring.py +++ b/clover/monitoring/monitoring.py @@ -90,8 +90,9 @@ class Monitoring(object): print("query %s %s, status=%s, size=%d, dur=%.3f" % \ (self.host, query_params["query"], resp.status_code, len(resp.text), dur)) - pp = pprint.PrettyPrinter(indent=2) - pp.pprint(resp.json()) + #pp = pprint.PrettyPrinter(indent=2) + ##pp.pprint(resp.json()) + return resp.json() except Exception as e: print("ERROR: Could not query prometheus instance %s. \n %s" % (url, e)) -- cgit 1.2.3-korg