summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--clover/collector/__init__.py0
-rwxr-xr-xclover/collector/build.sh16
-rw-r--r--clover/collector/db/__init__.py0
-rw-r--r--clover/collector/db/cassops.py144
-rw-r--r--clover/collector/db/redisops.py59
-rw-r--r--clover/collector/docker/Dockerfile30
-rw-r--r--clover/collector/grpc/__init__.py0
-rwxr-xr-xclover/collector/grpc/build_proto.sh11
-rw-r--r--clover/collector/grpc/collector.proto45
-rw-r--r--clover/collector/grpc/collector_client.py105
-rw-r--r--clover/collector/grpc/collector_pb2.py300
-rw-r--r--clover/collector/grpc/collector_pb2_grpc.py97
-rw-r--r--clover/collector/grpc/collector_server.py98
-rw-r--r--clover/collector/process/__init__.py0
-rw-r--r--clover/collector/process/collect.py162
-rwxr-xr-xclover/collector/process/grpc_process.sh11
-rw-r--r--clover/collector/yaml/manifest.template43
-rw-r--r--clover/collector/yaml/render_yaml.py73
-rw-r--r--clover/monitoring/monitoring.py5
19 files changed, 1197 insertions, 2 deletions
diff --git a/clover/collector/__init__.py b/clover/collector/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/clover/collector/__init__.py
diff --git a/clover/collector/build.sh b/clover/collector/build.sh
new file mode 100755
index 0000000..f305a02
--- /dev/null
+++ b/clover/collector/build.sh
@@ -0,0 +1,16 @@
+#!/bin/bash
+#
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+
+IMAGE_PATH=${IMAGE_PATH:-"localhost:5000"}
+IMAGE_NAME=${IMAGE_NAME:-"clover-collector"}
+
+docker build -f docker/Dockerfile -t $IMAGE_NAME .
+docker tag $IMAGE_NAME $IMAGE_PATH/$IMAGE_NAME
+docker push $IMAGE_PATH/$IMAGE_NAME
diff --git a/clover/collector/db/__init__.py b/clover/collector/db/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/clover/collector/db/__init__.py
diff --git a/clover/collector/db/cassops.py b/clover/collector/db/cassops.py
new file mode 100644
index 0000000..6553cff
--- /dev/null
+++ b/clover/collector/db/cassops.py
@@ -0,0 +1,144 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+from cassandra.cluster import Cluster
+from cassandra.query import BatchStatement
+import logging
+
+CASSANDRA_HOSTS = ['cassandra.default']
+
+
+class CassandraOps:
+
+ def __init__(self, hosts, port=9042, keyspace='visibility'):
+ logging.basicConfig(filename='cassops.log',
+ level=logging.DEBUG)
+ cluster = Cluster(hosts, port=port)
+ self.session = cluster.connect()
+ self.keyspace = keyspace
+
+ def truncate(self, tables=['traces', 'metrics', 'spans']):
+ self.session.set_keyspace(self.keyspace)
+ try:
+ for table in tables:
+ self.session.execute("""
+ TRUNCATE %s
+ """ % table)
+ except Exception as e:
+ logging.debug(e)
+
+ def init_visibility(self):
+ try:
+ self.session.execute("""
+ CREATE KEYSPACE %s
+ WITH replication = { 'class': 'SimpleStrategy',
+ 'replication_factor': '1' }
+ """ % self.keyspace)
+ except Exception as e:
+ logging.debug(e)
+
+ self.session.set_keyspace(self.keyspace)
+
+ try:
+ self.session.execute("""
+ CREATE TABLE IF NOT EXISTS traces (
+ traceid text,
+ processes list<text>,
+ PRIMARY KEY (traceid)
+ )
+ """)
+
+ self.session.execute("""
+ CREATE TABLE IF NOT EXISTS spans (
+ spanid text,
+ traceid text,
+ duration int,
+ start_time int,
+ processid text,
+ operation_name text,
+ node_id text,
+ http_url text,
+ upstream_cluster text,
+ PRIMARY KEY (spanid, traceid)
+ )
+ """)
+
+ self.session.execute("""
+ CREATE TABLE IF NOT EXISTS metrics (
+ m_name text,
+ m_value text,
+ m_time text,
+ service text,
+ monitor_time timestamp,
+ PRIMARY KEY (m_name, monitor_time)
+ )
+ """)
+ except Exception as e:
+ logging.debug(e)
+
+ def set_prepared(self):
+ self.session.set_keyspace(self.keyspace)
+ self.insert_tracing_stmt = self.session.prepare(
+ """
+ INSERT INTO spans (spanid, traceid, duration, operation_name,
+ node_id, http_url, upstream_cluster)
+ VALUES (?, ?, ?, ?, ?, ?, ?)
+ """
+ )
+ self.insert_metric_stmt = self.session.prepare(
+ """
+ INSERT INTO metrics
+ (m_name, m_value, m_time, service, monitor_time)
+ VALUES (?, ?, ?, ?, toTimestamp(now()))
+ """
+ )
+
+ def set_batch(self):
+ self.batch = BatchStatement()
+
+ def execute_batch(self):
+ self.session.execute(self.batch)
+
+ def insert_tracing(self, table, traceid, s, tags):
+ self.session.set_keyspace(self.keyspace)
+ if 'upstream_cluster' not in tags:
+ logging.debug('NO UPSTREAM_CLUSTER KEY')
+ tags['upstream_cluster'] = 'none'
+ try:
+ self.batch.add(self.insert_tracing_stmt,
+ (s['spanID'], traceid, s['duration'],
+ s['operationName'], tags['node_id'],
+ tags['http.url'], tags['upstream_cluster']))
+ except Exception as e:
+ logging.debug('{} {} {} {} {} {} {}'.format(s['spanID'], traceid,
+ s['duration'], s['operationName'], tags['node_id'],
+ tags['http.url'], tags['upstream_cluster']))
+ logging.debug(e)
+
+ def insert_trace(self, traceid, processes):
+ self.session.set_keyspace(self.keyspace)
+ self.session.execute(
+ """
+ INSERT INTO traces (traceid, processes)
+ VALUES (%s, %s)
+ """,
+ (traceid, processes)
+ )
+
+ def insert_metric(self, m_name, m_value, m_time, service):
+ self.session.set_keyspace(self.keyspace)
+ self.batch.add(self.insert_metric_stmt,
+ (m_name, m_value, m_time, service))
+
+
+def main():
+ cass = CassandraOps(CASSANDRA_HOSTS)
+ cass.init_visibility()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/clover/collector/db/redisops.py b/clover/collector/db/redisops.py
new file mode 100644
index 0000000..e80c417
--- /dev/null
+++ b/clover/collector/db/redisops.py
@@ -0,0 +1,59 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+import redis
+import logging
+
+REDIS_HOST = 'redis'
+# REDIS_HOST = '10.244.0.85'
+
+
+class RedisOps:
+
+ def __init__(self, host=REDIS_HOST):
+ logging.basicConfig(filename='redisops.log',
+ level=logging.DEBUG)
+ try:
+ self.r = redis.StrictRedis(host=host, port=6379, db=0)
+ except Exception as e:
+ logging.debug(e)
+
+ def init_services(self, skey='visibility_services'):
+ service_names = ['http_lb', 'proxy_access_control']
+ for s in service_names:
+ self.r.sadd(skey, s)
+
+ def init_metrics(self, pkey='metric_prefixes', skey='metric_suffixes'):
+ metric_prefixes = ['envoy_cluster_out_', 'envoy_cluster_in_']
+ metric_suffixes = [
+ '_default_svc_cluster_local_http_internal_upstream_rq_2xx',
+ '_default_svc_cluster_local_http_upstream_cx_active']
+ for p in metric_prefixes:
+ self.r.sadd(pkey, p)
+ for s in metric_suffixes:
+ self.r.sadd(skey, s)
+
+ def get_services(self, skey='visibility_services'):
+ services = self.r.smembers(skey)
+ return services
+
+ def get_metrics(self, pkey='metric_prefixes', skey='metric_suffixes'):
+ prefixes = self.r.smembers(pkey)
+ suffixes = self.r.smembers(skey)
+ return prefixes, suffixes
+
+
+def main():
+ r = RedisOps()
+ r.init_services()
+ r.init_metrics()
+ r.get_services()
+ r.get_metrics()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/clover/collector/docker/Dockerfile b/clover/collector/docker/Dockerfile
new file mode 100644
index 0000000..1714420
--- /dev/null
+++ b/clover/collector/docker/Dockerfile
@@ -0,0 +1,30 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+FROM opnfv/clover
+
+ENV REPOS_DIR="/home/opnfv/repos"
+
+# Clover repo
+ENV CLOVER_REPO_DIR="${REPOS_DIR}/clover"
+
+# Install required python packages
+RUN python -m pip install cassandra-driver redis
+
+# Set work directory
+WORKDIR ${CLOVER_REPO_DIR}
+
+COPY /process clover/collector/process
+COPY /grpc clover/collector/grpc
+COPY /db clover/collector/db
+COPY __init__.py clover/collector/__init__.py
+
+RUN pip install .
+
+WORKDIR "${CLOVER_REPO_DIR}/clover/collector"
+
+CMD ./process/grpc_process.sh no_schema_init
diff --git a/clover/collector/grpc/__init__.py b/clover/collector/grpc/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/clover/collector/grpc/__init__.py
diff --git a/clover/collector/grpc/build_proto.sh b/clover/collector/grpc/build_proto.sh
new file mode 100755
index 0000000..44467ad
--- /dev/null
+++ b/clover/collector/grpc/build_proto.sh
@@ -0,0 +1,11 @@
+#!/bin/bash
+#
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+
+python -m grpc_tools.protoc -I./ --python_out=. --grpc_python_out=. collector.proto
diff --git a/clover/collector/grpc/collector.proto b/clover/collector/grpc/collector.proto
new file mode 100644
index 0000000..fc8b636
--- /dev/null
+++ b/clover/collector/grpc/collector.proto
@@ -0,0 +1,45 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+
+syntax = "proto3";
+
+package collector;
+
+// The controller service definition.
+service Controller {
+
+ rpc StopCollector (ConfigCollector) returns (CollectorReply) {}
+ rpc StartCollector (ConfigCollector) returns (CollectorReply) {}
+ rpc InitVisibility (ConfigCassandra) returns (CollectorReply) {}
+ rpc TruncateVisibility (Schemas) returns (CollectorReply) {}
+}
+
+message ConfigCassandra {
+ string cassandra_hosts = 1;
+ int32 cassandra_port = 2;
+}
+
+message ConfigCollector {
+ string t_port = 1;
+ string t_host = 2;
+ string m_port = 3;
+ string m_host = 4;
+ string c_port = 5;
+ string c_hosts = 6;
+ string sinterval = 7;
+}
+
+message Schemas {
+ string schemas = 1;
+ string cassandra_hosts = 2;
+ int32 cassandra_port = 3;
+
+}
+
+message CollectorReply {
+ string message = 1;
+}
diff --git a/clover/collector/grpc/collector_client.py b/clover/collector/grpc/collector_client.py
new file mode 100644
index 0000000..b9e9f67
--- /dev/null
+++ b/clover/collector/grpc/collector_client.py
@@ -0,0 +1,105 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+from __future__ import print_function
+from kubernetes import client, config
+
+import grpc
+import argparse
+import pickle
+
+import collector_pb2
+import collector_pb2_grpc
+
+# This is a basic client script to test server GRPC messaging
+# TODO improve interface overall
+
+
+def run(args, grpc_port='50054'):
+ pod_ip = get_podip('clover-collector')
+ if pod_ip == '':
+ return "Can not find service: {}".format(args['service_name'])
+ collector_grpc = pod_ip + ':' + grpc_port
+ channel = grpc.insecure_channel(collector_grpc)
+ stub = collector_pb2_grpc.ControllerStub(channel)
+
+ if args['cmd'] == 'init':
+ return init_visibility(stub)
+ elif args['cmd'] == 'start':
+ return start_collector(stub)
+ elif args['cmd'] == 'stop':
+ return stop_collector(stub)
+ elif args['cmd'] == 'clean':
+ return clean_visibility(stub)
+ else:
+ return "Invalid command: {}".format(args['cmd'])
+
+
+def get_podip(pod_name):
+ ip = ''
+ if pod_name != '':
+ config.load_kube_config()
+ v1 = client.CoreV1Api()
+ ret = v1.list_pod_for_all_namespaces(watch=False)
+ for i in ret.items:
+ if i.metadata.name.lower().find(pod_name.lower()) != -1:
+ print("Pod IP: {}".format(i.status.pod_ip))
+ ip = i.status.pod_ip
+ return str(ip)
+ return str(ip)
+
+
+def init_visibility(stub):
+ try:
+ cassandra_hosts = pickle.dumps(['cassandra.default'])
+ response = stub.InitVisibility(collector_pb2.ConfigCassandra(
+ cassandra_hosts=cassandra_hosts, cassandra_port=9042))
+ except Exception as e:
+ return e
+ return response.message
+
+
+def clean_visibility(stub):
+ try:
+ cassandra_hosts = pickle.dumps(['cassandra.default'])
+ schemas = pickle.dumps(['spans', 'traces', 'metrics'])
+ response = stub.TruncateVisibility(collector_pb2.Schemas(
+ schemas=schemas, cassandra_hosts=cassandra_hosts,
+ cassandra_port=9042))
+ except Exception as e:
+ return e
+ return response.message
+
+
+def start_collector(stub):
+ try:
+ cassandra_hosts = pickle.dumps(['cassandra.default'])
+ response = stub.StartCollector(collector_pb2.ConfigCollector(
+ t_port='16686', t_host='jaeger-deployment.istio-system',
+ m_port='9090', m_host='prometheus.istio-system',
+ c_port='9042', c_hosts=cassandra_hosts,
+ sinterval='5'))
+ except Exception as e:
+ return e
+ return response.message
+
+
+def stop_collector(stub):
+ try:
+ response = stub.StopCollector(collector_pb2.ConfigCollector())
+ except Exception as e:
+ return e
+ return response.message
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--cmd', required=True,
+ help='Command to execute in collector')
+ args = parser.parse_args()
+ print(run(vars(args)))
diff --git a/clover/collector/grpc/collector_pb2.py b/clover/collector/grpc/collector_pb2.py
new file mode 100644
index 0000000..f67c880
--- /dev/null
+++ b/clover/collector/grpc/collector_pb2.py
@@ -0,0 +1,300 @@
+# Generated by the protocol buffer compiler. DO NOT EDIT!
+# source: collector.proto
+
+import sys
+_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+from google.protobuf import descriptor_pb2
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+
+
+DESCRIPTOR = _descriptor.FileDescriptor(
+ name='collector.proto',
+ package='collector',
+ syntax='proto3',
+ serialized_pb=_b('\n\x0f\x63ollector.proto\x12\tcollector\"B\n\x0f\x43onfigCassandra\x12\x17\n\x0f\x63\x61ssandra_hosts\x18\x01 \x01(\t\x12\x16\n\x0e\x63\x61ssandra_port\x18\x02 \x01(\x05\"\x85\x01\n\x0f\x43onfigCollector\x12\x0e\n\x06t_port\x18\x01 \x01(\t\x12\x0e\n\x06t_host\x18\x02 \x01(\t\x12\x0e\n\x06m_port\x18\x03 \x01(\t\x12\x0e\n\x06m_host\x18\x04 \x01(\t\x12\x0e\n\x06\x63_port\x18\x05 \x01(\t\x12\x0f\n\x07\x63_hosts\x18\x06 \x01(\t\x12\x11\n\tsinterval\x18\x07 \x01(\t\"K\n\x07Schemas\x12\x0f\n\x07schemas\x18\x01 \x01(\t\x12\x17\n\x0f\x63\x61ssandra_hosts\x18\x02 \x01(\t\x12\x16\n\x0e\x63\x61ssandra_port\x18\x03 \x01(\x05\"!\n\x0e\x43ollectorReply\x12\x0f\n\x07message\x18\x01 \x01(\t2\xb3\x02\n\nController\x12H\n\rStopCollector\x12\x1a.collector.ConfigCollector\x1a\x19.collector.CollectorReply\"\x00\x12I\n\x0eStartCollector\x12\x1a.collector.ConfigCollector\x1a\x19.collector.CollectorReply\"\x00\x12I\n\x0eInitVisibility\x12\x1a.collector.ConfigCassandra\x1a\x19.collector.CollectorReply\"\x00\x12\x45\n\x12TruncateVisibility\x12\x12.collector.Schemas\x1a\x19.collector.CollectorReply\"\x00\x62\x06proto3')
+)
+
+
+
+
+_CONFIGCASSANDRA = _descriptor.Descriptor(
+ name='ConfigCassandra',
+ full_name='collector.ConfigCassandra',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='cassandra_hosts', full_name='collector.ConfigCassandra.cassandra_hosts', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='cassandra_port', full_name='collector.ConfigCassandra.cassandra_port', index=1,
+ number=2, type=5, cpp_type=1, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=30,
+ serialized_end=96,
+)
+
+
+_CONFIGCOLLECTOR = _descriptor.Descriptor(
+ name='ConfigCollector',
+ full_name='collector.ConfigCollector',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='t_port', full_name='collector.ConfigCollector.t_port', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='t_host', full_name='collector.ConfigCollector.t_host', index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='m_port', full_name='collector.ConfigCollector.m_port', index=2,
+ number=3, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='m_host', full_name='collector.ConfigCollector.m_host', index=3,
+ number=4, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='c_port', full_name='collector.ConfigCollector.c_port', index=4,
+ number=5, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='c_hosts', full_name='collector.ConfigCollector.c_hosts', index=5,
+ number=6, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='sinterval', full_name='collector.ConfigCollector.sinterval', index=6,
+ number=7, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=99,
+ serialized_end=232,
+)
+
+
+_SCHEMAS = _descriptor.Descriptor(
+ name='Schemas',
+ full_name='collector.Schemas',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='schemas', full_name='collector.Schemas.schemas', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='cassandra_hosts', full_name='collector.Schemas.cassandra_hosts', index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='cassandra_port', full_name='collector.Schemas.cassandra_port', index=2,
+ number=3, type=5, cpp_type=1, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=234,
+ serialized_end=309,
+)
+
+
+_COLLECTORREPLY = _descriptor.Descriptor(
+ name='CollectorReply',
+ full_name='collector.CollectorReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='message', full_name='collector.CollectorReply.message', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=311,
+ serialized_end=344,
+)
+
+DESCRIPTOR.message_types_by_name['ConfigCassandra'] = _CONFIGCASSANDRA
+DESCRIPTOR.message_types_by_name['ConfigCollector'] = _CONFIGCOLLECTOR
+DESCRIPTOR.message_types_by_name['Schemas'] = _SCHEMAS
+DESCRIPTOR.message_types_by_name['CollectorReply'] = _COLLECTORREPLY
+_sym_db.RegisterFileDescriptor(DESCRIPTOR)
+
+ConfigCassandra = _reflection.GeneratedProtocolMessageType('ConfigCassandra', (_message.Message,), dict(
+ DESCRIPTOR = _CONFIGCASSANDRA,
+ __module__ = 'collector_pb2'
+ # @@protoc_insertion_point(class_scope:collector.ConfigCassandra)
+ ))
+_sym_db.RegisterMessage(ConfigCassandra)
+
+ConfigCollector = _reflection.GeneratedProtocolMessageType('ConfigCollector', (_message.Message,), dict(
+ DESCRIPTOR = _CONFIGCOLLECTOR,
+ __module__ = 'collector_pb2'
+ # @@protoc_insertion_point(class_scope:collector.ConfigCollector)
+ ))
+_sym_db.RegisterMessage(ConfigCollector)
+
+Schemas = _reflection.GeneratedProtocolMessageType('Schemas', (_message.Message,), dict(
+ DESCRIPTOR = _SCHEMAS,
+ __module__ = 'collector_pb2'
+ # @@protoc_insertion_point(class_scope:collector.Schemas)
+ ))
+_sym_db.RegisterMessage(Schemas)
+
+CollectorReply = _reflection.GeneratedProtocolMessageType('CollectorReply', (_message.Message,), dict(
+ DESCRIPTOR = _COLLECTORREPLY,
+ __module__ = 'collector_pb2'
+ # @@protoc_insertion_point(class_scope:collector.CollectorReply)
+ ))
+_sym_db.RegisterMessage(CollectorReply)
+
+
+
+_CONTROLLER = _descriptor.ServiceDescriptor(
+ name='Controller',
+ full_name='collector.Controller',
+ file=DESCRIPTOR,
+ index=0,
+ options=None,
+ serialized_start=347,
+ serialized_end=654,
+ methods=[
+ _descriptor.MethodDescriptor(
+ name='StopCollector',
+ full_name='collector.Controller.StopCollector',
+ index=0,
+ containing_service=None,
+ input_type=_CONFIGCOLLECTOR,
+ output_type=_COLLECTORREPLY,
+ options=None,
+ ),
+ _descriptor.MethodDescriptor(
+ name='StartCollector',
+ full_name='collector.Controller.StartCollector',
+ index=1,
+ containing_service=None,
+ input_type=_CONFIGCOLLECTOR,
+ output_type=_COLLECTORREPLY,
+ options=None,
+ ),
+ _descriptor.MethodDescriptor(
+ name='InitVisibility',
+ full_name='collector.Controller.InitVisibility',
+ index=2,
+ containing_service=None,
+ input_type=_CONFIGCASSANDRA,
+ output_type=_COLLECTORREPLY,
+ options=None,
+ ),
+ _descriptor.MethodDescriptor(
+ name='TruncateVisibility',
+ full_name='collector.Controller.TruncateVisibility',
+ index=3,
+ containing_service=None,
+ input_type=_SCHEMAS,
+ output_type=_COLLECTORREPLY,
+ options=None,
+ ),
+])
+_sym_db.RegisterServiceDescriptor(_CONTROLLER)
+
+DESCRIPTOR.services_by_name['Controller'] = _CONTROLLER
+
+# @@protoc_insertion_point(module_scope)
diff --git a/clover/collector/grpc/collector_pb2_grpc.py b/clover/collector/grpc/collector_pb2_grpc.py
new file mode 100644
index 0000000..a7be73c
--- /dev/null
+++ b/clover/collector/grpc/collector_pb2_grpc.py
@@ -0,0 +1,97 @@
+# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
+import grpc
+
+import collector_pb2 as collector__pb2
+
+
+class ControllerStub(object):
+ """The controller service definition.
+ """
+
+ def __init__(self, channel):
+ """Constructor.
+
+ Args:
+ channel: A grpc.Channel.
+ """
+ self.StopCollector = channel.unary_unary(
+ '/collector.Controller/StopCollector',
+ request_serializer=collector__pb2.ConfigCollector.SerializeToString,
+ response_deserializer=collector__pb2.CollectorReply.FromString,
+ )
+ self.StartCollector = channel.unary_unary(
+ '/collector.Controller/StartCollector',
+ request_serializer=collector__pb2.ConfigCollector.SerializeToString,
+ response_deserializer=collector__pb2.CollectorReply.FromString,
+ )
+ self.InitVisibility = channel.unary_unary(
+ '/collector.Controller/InitVisibility',
+ request_serializer=collector__pb2.ConfigCassandra.SerializeToString,
+ response_deserializer=collector__pb2.CollectorReply.FromString,
+ )
+ self.TruncateVisibility = channel.unary_unary(
+ '/collector.Controller/TruncateVisibility',
+ request_serializer=collector__pb2.Schemas.SerializeToString,
+ response_deserializer=collector__pb2.CollectorReply.FromString,
+ )
+
+
+class ControllerServicer(object):
+ """The controller service definition.
+ """
+
+ def StopCollector(self, request, context):
+ # missing associated documentation comment in .proto file
+ pass
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def StartCollector(self, request, context):
+ # missing associated documentation comment in .proto file
+ pass
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def InitVisibility(self, request, context):
+ # missing associated documentation comment in .proto file
+ pass
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def TruncateVisibility(self, request, context):
+ # missing associated documentation comment in .proto file
+ pass
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+
+def add_ControllerServicer_to_server(servicer, server):
+ rpc_method_handlers = {
+ 'StopCollector': grpc.unary_unary_rpc_method_handler(
+ servicer.StopCollector,
+ request_deserializer=collector__pb2.ConfigCollector.FromString,
+ response_serializer=collector__pb2.CollectorReply.SerializeToString,
+ ),
+ 'StartCollector': grpc.unary_unary_rpc_method_handler(
+ servicer.StartCollector,
+ request_deserializer=collector__pb2.ConfigCollector.FromString,
+ response_serializer=collector__pb2.CollectorReply.SerializeToString,
+ ),
+ 'InitVisibility': grpc.unary_unary_rpc_method_handler(
+ servicer.InitVisibility,
+ request_deserializer=collector__pb2.ConfigCassandra.FromString,
+ response_serializer=collector__pb2.CollectorReply.SerializeToString,
+ ),
+ 'TruncateVisibility': grpc.unary_unary_rpc_method_handler(
+ servicer.TruncateVisibility,
+ request_deserializer=collector__pb2.Schemas.FromString,
+ response_serializer=collector__pb2.CollectorReply.SerializeToString,
+ ),
+ }
+ generic_handler = grpc.method_handlers_generic_handler(
+ 'collector.Controller', rpc_method_handlers)
+ server.add_generic_rpc_handlers((generic_handler,))
diff --git a/clover/collector/grpc/collector_server.py b/clover/collector/grpc/collector_server.py
new file mode 100644
index 0000000..c2eb221
--- /dev/null
+++ b/clover/collector/grpc/collector_server.py
@@ -0,0 +1,98 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+
+from concurrent import futures
+from clover.collector.db.cassops import CassandraOps
+import time
+import sys
+import grpc
+import subprocess
+import pickle
+import logging
+import collector_pb2
+import collector_pb2_grpc
+
+
+_ONE_DAY_IN_SECONDS = 60 * 60 * 24
+GRPC_PORT = '[::]:50054'
+
+
+class Controller(collector_pb2_grpc.ControllerServicer):
+
+ def __init__(self, init_visibility):
+ logging.basicConfig(filename='collector_server.log',
+ level=logging.DEBUG)
+ self.collector = 0
+ if init_visibility == 'set_schemas':
+ cassandra_hosts = pickle.dumps(['cassandra.default'])
+ self.InitVisibility(collector_pb2.ConfigCassandra(
+ cassandra_port=9042, cassandra_hosts=cassandra_hosts), "")
+
+ def StopCollector(self, r, context):
+ try:
+ subprocess.Popen.kill(self.collector)
+ msg = "Stopped collector on pid: {}".format(self.collector.pid)
+ except Exception as e:
+ logging.debug(e)
+ msg = "Failed to stop collector"
+ return collector_pb2.CollectorReply(message=msg)
+
+ def StartCollector(self, r, context):
+ try:
+ self.collector = subprocess.Popen(
+ ["python", "process/collect.py",
+ "-sinterval={}".format(r.sinterval),
+ "-c_port={}".format(r.c_port),
+ "-t_port={}".format(r.t_port), "-t_host={}".format(r.t_host),
+ "-m_port={}".format(r.m_port), "-m_host={}".format(r.m_host),
+ "-c_hosts={}".format(pickle.loads(r.c_hosts)), "&"],
+ shell=False)
+ msg = "Started collector on pid: {}".format(self.collector.pid)
+ except Exception as e:
+ logging.debug(e)
+ msg = e
+ return collector_pb2.CollectorReply(message=msg)
+
+ def InitVisibility(self, r, context):
+ try:
+ cass = CassandraOps(pickle.loads(r.cassandra_hosts),
+ r.cassandra_port)
+ cass.init_visibility()
+ msg = "Added visibility schemas in cassandra"
+ except Exception as e:
+ logging.debug(e)
+ msg = "Failed to initialize cassandra"
+ return collector_pb2.CollectorReply(message=msg)
+
+ def TruncateVisibility(self, r, context):
+ try:
+ cass = CassandraOps(pickle.loads(r.cassandra_hosts),
+ r.cassandra_port)
+ cass.truncate(pickle.loads(r.schemas))
+ msg = "Truncated visibility tables"
+ except Exception as e:
+ logging.debug(e)
+ msg = "Failed to truncate visibility"
+ return collector_pb2.CollectorReply(message=msg)
+
+
+def serve(init_visibility):
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ collector_pb2_grpc.add_ControllerServicer_to_server(
+ Controller(init_visibility), server)
+ server.add_insecure_port(GRPC_PORT)
+ server.start()
+ try:
+ while True:
+ time.sleep(_ONE_DAY_IN_SECONDS)
+ except KeyboardInterrupt:
+ server.stop(0)
+
+
+if __name__ == '__main__':
+ serve(sys.argv[1])
diff --git a/clover/collector/process/__init__.py b/clover/collector/process/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/clover/collector/process/__init__.py
diff --git a/clover/collector/process/collect.py b/clover/collector/process/collect.py
new file mode 100644
index 0000000..d8beb49
--- /dev/null
+++ b/clover/collector/process/collect.py
@@ -0,0 +1,162 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+from clover.tracing.tracing import Tracing
+from clover.monitoring.monitoring import Monitoring
+from clover.collector.db.cassops import CassandraOps
+from clover.collector.db.redisops import RedisOps
+
+# import pprint
+import time
+import argparse
+import logging
+import ast
+
+TRACING_SERVICES = ['istio-ingress']
+TRACING_PORT = "16686"
+MONITORING_PORT = "9090"
+CASSANDRA_PORT = 9042 # Provide as integer
+MONITORING_HOST = "prometheus.istio-system"
+TRACING_HOST = "jaeger-deployment.istio-system"
+CASSANDRA_HOSTS = ['cassandra.default']
+
+
+class Collector:
+
+ def __init__(self, t_port, t_host, m_port, m_host, c_port, c_hosts):
+ logging.basicConfig(filename='collector.log', level=logging.DEBUG)
+ try:
+ self.t = Tracing(t_host, t_port, '', False)
+ monitoring_url = "http://{}:{}".format(m_host, m_port)
+ self.m = Monitoring(monitoring_url)
+ self.c = CassandraOps(c_hosts, int(c_port))
+ self.c.set_prepared()
+ self.r = RedisOps()
+ except Exception as e:
+ logging.debug(e)
+
+ # Toplevel tracing retrieval and batch insert
+ def get_tracing(self, services, time_back=20):
+ self.c.set_batch()
+ for service in services:
+ traces = self.t.getTraces(service, time_back)
+ try:
+ self.set_tracing(traces)
+ except Exception as e:
+ logging.debug(e)
+ self.c.execute_batch()
+
+ # Insert to cassandra visibility traces and spans tables
+ def set_tracing(self, trace):
+ for traces in trace['data']:
+ for spans in traces['spans']:
+ span = {}
+ span['spanID'] = spans['spanID']
+ span['duration'] = spans['duration']
+ span['startTime'] = spans['startTime']
+ span['operationName'] = spans['operationName']
+ tag = {}
+ for tags in spans['tags']:
+ tag[tags['key']] = tags['value']
+ self.c.insert_tracing('spans', traces['traceID'],
+ span, tag)
+ process_list = []
+ for p in traces['processes']:
+ process_list.append(p)
+ service_names = []
+ for pname in process_list:
+ service_names.append(traces['processes'][pname]['serviceName'])
+ self.c.insert_trace(traces['traceID'], service_names)
+
+ # Insert to cassandra visibility metrics table
+ def get_monitoring(self):
+
+ # Fetch collector service/metric lists from redis
+ service_names = self.r.get_services()
+ metric_prefixes, metric_suffixes = self.r.get_metrics()
+
+ self.c.set_batch()
+ for sname in service_names:
+ for prefix in metric_prefixes:
+ for suffix in metric_suffixes:
+ try:
+ metric_name = prefix + sname + suffix
+ query_params = {
+ "type": "instant",
+ "query": metric_name
+ }
+ data = self.m.query(query_params)
+ m_value = data['data']['result'][0]['value'][1]
+ m_time = data['data']['result'][0]['value'][0]
+ mn = data['data']['result'][0]['metric']['__name__']
+ self.c.insert_metric(mn, m_value, str(m_time), sname)
+ except Exception as e:
+ logging.debug(e)
+ self.c.execute_batch()
+
+ # TODO add batch retrieval for monitoring metrics
+ # query_range_param = {
+ # "type": "range",
+ # "query": "tbd",
+ # "start": "60m",
+ # "end": "5m",
+ # "step": "30s"
+ # }
+ # data = self.m.query(query_range_param)
+ # pp = pprint.PrettyPrinter(indent=2)
+ # pp.pprint(data)
+
+
+def main(args):
+ if isinstance(args['c_hosts'], basestring):
+ ch = ast.literal_eval(args['c_hosts'])
+ else:
+ ch = args['c_hosts']
+
+ c = Collector(args['t_port'], args['t_host'], args['m_port'],
+ args['m_host'], args['c_port'], ch)
+
+ # Collector loop
+ loop = True
+ while loop:
+ try:
+ c.get_tracing(args['t_services'])
+ c.get_monitoring()
+ time.sleep(int(args['sinterval']))
+ except KeyboardInterrupt:
+ loop = False
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '-sinterval', default=5,
+ help='Sample interval for collector loop')
+ parser.add_argument(
+ '-t_port', default=TRACING_PORT,
+ help='Port to access Jaeger tracing')
+ parser.add_argument(
+ '-m_port', default=MONITORING_PORT,
+ help='Port to access Prometheus monitoring')
+ parser.add_argument(
+ '-t_host', default=TRACING_HOST,
+ help='Host to access Jaeger tracing')
+ parser.add_argument(
+ '-m_host', default=MONITORING_HOST,
+ help='Host to access Prometheus monitoring')
+ parser.add_argument(
+ '-c_hosts', default=CASSANDRA_HOSTS,
+ help='Host(s) to access Cassandra cluster')
+ parser.add_argument(
+ '-c_port', default=CASSANDRA_PORT,
+ help='Port to access Cassandra cluster')
+ parser.add_argument(
+ '-t_services', default=TRACING_SERVICES,
+ help='Collect services on this list of services')
+
+ args, unknown = parser.parse_known_args()
+ print(main(vars(args)))
diff --git a/clover/collector/process/grpc_process.sh b/clover/collector/process/grpc_process.sh
new file mode 100755
index 0000000..30e0171
--- /dev/null
+++ b/clover/collector/process/grpc_process.sh
@@ -0,0 +1,11 @@
+#!/bin/bash
+#
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+
+python grpc/collector_server.py test1
diff --git a/clover/collector/yaml/manifest.template b/clover/collector/yaml/manifest.template
new file mode 100644
index 0000000..c7aa3e7
--- /dev/null
+++ b/clover/collector/yaml/manifest.template
@@ -0,0 +1,43 @@
+---
+apiVersion: extensions/v1beta1
+kind: Deployment
+metadata:
+ name: {{ deploy_name }}
+ labels:
+ app: {{ deploy_name }}
+spec:
+ template:
+ metadata:
+ labels:
+ app: {{ deploy_name }}
+ spec:
+ containers:
+ - name: {{ deploy_name }}
+ image: {{ image_path }}/{{ image_name }}:{{ image_tag }}
+ ports:
+ - containerPort: {{ grpc_port }}
+ - containerPort: {{ redis_port }}
+ - containerPort: {{ monitor_port }}
+ - containerPort: {{ trace_port }}
+ - containerPort: {{ cass_port }}
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: {{ deploy_name }}
+ labels:
+ app: {{ deploy_name }}
+spec:
+ ports:
+ - port: {{ grpc_port }}
+ name: grpc
+ - port: {{ redis_port }}
+ name: redis
+ - port: {{ trace_port }}
+ name: jaeger-deployment
+ - port: {{ monitor_port }}
+ name: prometheus
+ - port: {{ cass_port }}
+ name: cassandra
+ selector:
+ app: {{ deploy_name }}
diff --git a/clover/collector/yaml/render_yaml.py b/clover/collector/yaml/render_yaml.py
new file mode 100644
index 0000000..c1d8be7
--- /dev/null
+++ b/clover/collector/yaml/render_yaml.py
@@ -0,0 +1,73 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+import argparse
+
+from jinja2 import Template
+
+
+def render_yaml(args):
+ template_file = 'manifest.template'
+ out_file = args['deploy_name'] + '.yaml'
+
+ try:
+ with open(template_file) as f:
+ tmpl = Template(f.read())
+ output = tmpl.render(
+ image_path=args['image_path'],
+ image_name=args['image_name'],
+ image_tag=args['image_tag'],
+ deploy_name=args['deploy_name'],
+ grpc_port=args['grpc_port'],
+ monitor_port=args['monitor_port'],
+ redis_port=args['redis_port'],
+ cass_port=args['cass_port'],
+ trace_port=args['trace_port']
+ )
+ with open(out_file, "wb") as fh:
+ fh.write(output)
+ return "Generated manifest for {}".format(args['deploy_name'])
+ except Exception as e:
+ print(e)
+ return "Unable to generate manifest for {}".format(
+ args['deploy_name'])
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--image_name', default='clover-collector',
+ help='The image name to use')
+ parser.add_argument(
+ # '--image_path', default='opnfv',
+ '--image_path', default='localhost:5000',
+ help='The path to the image to use')
+ parser.add_argument(
+ # '--image_tag', default='opnfv-6.0.0',
+ '--image_tag', default='latest',
+ help='The image tag to use')
+ parser.add_argument(
+ '--deploy_name', default='clover-collector',
+ help='The k8s deploy name to use')
+ parser.add_argument(
+ '--redis_port', default='6379',
+ help='The redis port to connect for management')
+ parser.add_argument(
+ '--monitor_port', default='9090',
+ help='The Prometheus monitoring port')
+ parser.add_argument(
+ '--grpc_port', default='50054',
+ help='The GRPC server port for collector management')
+ parser.add_argument(
+ '--trace_port', default='16686',
+ help='The Jaeger tracing port')
+ parser.add_argument(
+ '--cass_port', default='9042',
+ help='The Cassandra port')
+
+ args = parser.parse_args()
+ print(render_yaml(vars(args)))
diff --git a/clover/monitoring/monitoring.py b/clover/monitoring/monitoring.py
index 9726fd1..ec97e82 100644
--- a/clover/monitoring/monitoring.py
+++ b/clover/monitoring/monitoring.py
@@ -90,8 +90,9 @@ class Monitoring(object):
print("query %s %s, status=%s, size=%d, dur=%.3f" % \
(self.host, query_params["query"], resp.status_code, len(resp.text), dur))
- pp = pprint.PrettyPrinter(indent=2)
- pp.pprint(resp.json())
+ #pp = pprint.PrettyPrinter(indent=2)
+ ##pp.pprint(resp.json())
+ return resp.json()
except Exception as e:
print("ERROR: Could not query prometheus instance %s. \n %s" % (url, e))