diff options
author | Bryan Sullivan <bryan.sullivan@att.com> | 2017-11-15 16:27:17 -0800 |
---|---|---|
committer | Bryan Sullivan <bryan.sullivan@att.com> | 2017-11-19 20:48:54 -0800 |
commit | 4b0b335d54946cbb202dfdf0545d499cf559faaa (patch) | |
tree | 2fa112f525c071a08881e0ec6bee7c3b6c1ea674 /tools | |
parent | c40be26f0586fa931c986e2eea40477c524d381d (diff) |
Updated monitor.py and grafana dashboard
JIRA: VES-2
Various fixes in ves-setup
Add demo_deploy.sh
Refactored to containerized agent and collector
Change-Id: I8851465742aaf40a4cce265508a3d2d66abced08
Signed-off-by: Bryan Sullivan <bryan.sullivan@att.com>
Diffstat (limited to 'tools')
-rw-r--r-- | tools/demo_deploy.sh | 75 | ||||
-rw-r--r-- | tools/grafana/Dashboard.json | 763 | ||||
-rw-r--r-- | tools/monitor.py | 765 | ||||
-rw-r--r-- | tools/ves-setup.sh | 279 |
4 files changed, 1760 insertions, 122 deletions
diff --git a/tools/demo_deploy.sh b/tools/demo_deploy.sh new file mode 100644 index 0000000..b0b76e8 --- /dev/null +++ b/tools/demo_deploy.sh @@ -0,0 +1,75 @@ +#!/bin/bash +# Copyright 2017 AT&T Intellectual Property, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +#. What this is: Complete scripted deployment of the VES monitoring framework +# When complete, the following will be installed: +#. - On the specified master node, a Kafka server and containers running the +# OPNFV Barometer VES agent, OPNFV VES collector, InfluxDB, and Grafana +#. - On each specified worker node, collectd configured per OPNFV Barometer +#. +#. Prerequisites: +#. - Ubuntu server for kubernetes cluster nodes (master and worker nodes) +#. - MAAS server as cluster admin for kubernetes master/worker nodes +#. - Password-less ssh key provided for node setup +#. - hostname of kubernetes master setup in DNS or /etc/hosts +#. Usage: on the MAAS server +#. $ git clone https://gerrit.opnfv.org/gerrit/ves ~/ves +#. $ bash ~/ves/tools/demo_deploy.sh master <node> <key> +#. master: setup VES on k8s master +#. <node>: IP of cluster master node +#. <key>: SSH key enabling password-less SSH to nodes +#. $ bash ~/ves/tools/demo_deploy.sh worker <node> <key> +#. worker: setup VES on k8s worker +#. <node>: IP of worker node +#. <key>: SSH key enabling password-less SSH to nodes + +node=$2 +key=$3 + +eval `ssh-agent` +ssh-add $key +if [[ "$1" == "master" ]]; then + echo; echo "$0 $(date): Setting up master node" + scp -r -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no \ + ~/ves ubuntu@$node:/tmp + ssh -x -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no ubuntu@$node <<EOF + ves_host="$master" + export ves_host + ves_mode="guest" + export ves_mode + ves_user="hello" + export ves_user + ves_pass="world" + export ves_pass + ves_kafka_host="$node" + export ves_kafka_host + bash /tmp/ves/tools/ves-setup.sh collector + bash /tmp/ves/tools/ves-setup.sh kafka + bash /tmp/ves/tools/ves-setup.sh collectd + bash /tmp/ves/tools/ves-setup.sh agent +EOF + mkdir /tmp/ves + scp -r -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no \ + ubuntu@$node:/tmp/ves/ves_env.sh /tmp/ves/. + echo "VES Grafana dashboards are available at http://$node:3001 (login as admin/admin)" +else + echo; echo "$0 $(date): Setting up collectd at $node" + scp -r -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no \ + ~/ves ubuntu@$node:/tmp + scp -r -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no \ + /tmp/ves/ves_env.sh ubuntu@$node:/tmp/ves/. + ssh -x -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no \ + ubuntu@$node bash /tmp/ves/tools/ves-setup.sh collectd +fi diff --git a/tools/grafana/Dashboard.json b/tools/grafana/Dashboard.json new file mode 100644 index 0000000..d59e5da --- /dev/null +++ b/tools/grafana/Dashboard.json @@ -0,0 +1,763 @@ +{ +"dashboard": { + "editable": true, + "gnetId": null, + "graphTooltip": 0, + "hideControls": false, + "id": null, + "links": [], + "refresh": "10s", + "rows": [ + { + "collapse": false, + "height": 273, + "panels": [ + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "VESEvents", + "fill": 1, + "grid": { + "leftLogBase": 1, + "leftMax": null, + "leftMin": null, + "rightLogBase": 1, + "rightMax": null, + "rightMin": null + }, + "id": 3, + "interval": "30s", + "legend": { + "alignAsTable": false, + "avg": true, + "current": true, + "max": true, + "min": true, + "rightSide": false, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "span": 6, + "stack": false, + "steppedLine": false, + "targets": [ + { + "alias": "CpuUser", + "dsType": "influxdb", + "groupBy": [ + { + "params": [ + "system" + ], + "type": "tag" + } + ], + "measurement": "load", + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT mean(\"cpuuser\") FROM \"cpu\" WHERE \"system\" = 'computehost' AND $timeFilter GROUP BY time(1m) fill(null)", + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "load-longterm" + ], + "type": "field" + } + ] + ], + "tags": [] + }, + { + "alias": "CpuSystem", + "dsType": "influxdb", + "groupBy": [ + { + "params": [ + "system" + ], + "type": "tag" + } + ], + "measurement": "load", + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT mean(\"cpusystem\") FROM \"cpu\" WHERE $timeFilter GROUP BY time(1m) fill(null)", + "refId": "B", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "load-shortterm" + ], + "type": "field" + } + ] + ], + "tags": [ + { + "key": "system", + "operator": "=", + "value": "OPNFV01" + } + ] + }, + { + "alias": "CpuIdle", + "dsType": "influxdb", + "groupBy": [ + { + "params": [ + "system" + ], + "type": "tag" + } + ], + "measurement": "load", + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT mean(\"cpuidle\") FROM \"cpu\" WHERE $timeFilter GROUP BY time(1m) fill(null)", + "refId": "C", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "load-midterm" + ], + "type": "field" + } + ] + ], + "tags": [ + { + "key": "system", + "operator": "=", + "value": "OPNFV01" + } + ] + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "host load", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "x-axis": true, + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "y-axis": true, + "y_formats": [ + "short", + "short" + ], + "yaxes": [ + { + "format": "short", + "label": "Percent", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "VESEvents", + "fill": 1, + "id": 5, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "span": 6, + "stack": false, + "steppedLine": false, + "targets": [ + { + "alias": "Free", + "dsType": "influxdb", + "groupBy": [ + { + "params": [ + "system" + ], + "type": "tag" + } + ], + "measurement": "memoryUsage", + "orderByTime": "ASC", + "policy": "autogen", + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "memoryFree" + ], + "type": "field" + } + ] + ], + "tags": [] + }, + { + "alias": "Used", + "dsType": "influxdb", + "groupBy": [ + { + "params": [ + "system" + ], + "type": "tag" + } + ], + "measurement": "memoryUsage", + "orderByTime": "ASC", + "policy": "autogen", + "refId": "B", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "memoryUsed" + ], + "type": "field" + } + ] + ], + "tags": [] + }, + { + "alias": "Buffered", + "dsType": "influxdb", + "groupBy": [ + { + "params": [ + "system" + ], + "type": "tag" + } + ], + "measurement": "memoryUsage", + "orderByTime": "ASC", + "policy": "autogen", + "refId": "C", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "memoryBuffered" + ], + "type": "field" + } + ] + ], + "tags": [] + }, + { + "alias": "Cached", + "dsType": "influxdb", + "groupBy": [ + { + "params": [ + "system" + ], + "type": "tag" + } + ], + "measurement": "memoryUsage", + "orderByTime": "ASC", + "policy": "autogen", + "refId": "D", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "memoryCached" + ], + "type": "field" + } + ] + ], + "tags": [] + }, + { + "alias": "SlabRecl", + "dsType": "influxdb", + "groupBy": [ + { + "params": [ + "system" + ], + "type": "tag" + } + ], + "measurement": "memoryUsage", + "orderByTime": "ASC", + "policy": "autogen", + "refId": "E", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "memorySlabRecl" + ], + "type": "field" + } + ] + ], + "tags": [] + }, + { + "alias": "SlabUnrecl", + "dsType": "influxdb", + "groupBy": [ + { + "params": [ + "system" + ], + "type": "tag" + } + ], + "measurement": "memoryUsage", + "orderByTime": "ASC", + "policy": "autogen", + "refId": "F", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "memorySlabUnrecl" + ], + "type": "field" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Memory", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 10, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + } + ], + "repeat": null, + "repeatIteration": null, + "repeatRowId": null, + "showTitle": false, + "title": "Dashboard Row", + "titleSize": "h6" + }, + { + "collapse": false, + "height": 250, + "panels": [ + { + "alert": { + "conditions": [ + { + "evaluator": { + "params": [ + 15000000 + ], + "type": "gt" + }, + "operator": { + "type": "and" + }, + "query": { + "params": [ + "A", + "1s", + "now" + ] + }, + "reducer": { + "params": [], + "type": "avg" + }, + "type": "query" + } + ], + "executionErrorState": "alerting", + "frequency": "1s", + "handler": 1, + "message": "Transmitted Traffic Exceeded limits\nClosed Loop Action:Apply Firewall Rules", + "name": "VES webserver_1 Network Usage alert", + "noDataState": "no_data", + "notifications": [] + }, + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "VESEvents", + "fill": 1, + "grid": { + "leftLogBase": 1, + "leftMax": null, + "leftMin": null, + "rightLogBase": 1, + "rightMax": null, + "rightMin": null + }, + "id": 2, + "interval": "30s", + "legend": { + "alignAsTable": false, + "avg": true, + "current": false, + "max": false, + "min": false, + "rightSide": false, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "span": 6, + "stack": false, + "steppedLine": false, + "targets": [ + { + "alias": "rxOctets", + "dsType": "influxdb", + "groupBy": [ + { + "params": [ + "system" + ], + "type": "tag" + }, + { + "params": [ + "vnic" + ], + "type": "tag" + } + ], + "measurement": "vNicPerformance", + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT derivative(mean(\"rxoctetsacc\"), 10s) FROM \"vnic\" WHERE \"system\" = 'computehost' AND $timeFilter GROUP BY time(1m) fill(null)", + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "receivedOctetsAccumulated" + ], + "type": "field" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Received Octets", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "x-axis": true, + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "y-axis": true, + "y_formats": [ + "short", + "short" + ], + "yaxes": [ + { + "format": "short", + "label": "Octets/Packets", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "VESEvents", + "fill": 1, + "grid": { + "leftLogBase": 1, + "leftMax": null, + "leftMin": null, + "rightLogBase": 1, + "rightMax": null, + "rightMin": null + }, + "id": 4, + "interval": "30s", + "legend": { + "alignAsTable": false, + "avg": true, + "current": false, + "max": false, + "min": false, + "rightSide": false, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "span": 6, + "stack": false, + "steppedLine": false, + "targets": [ + { + "alias": "txOctets", + "dsType": "influxdb", + "groupBy": [ + { + "params": [ + "system" + ], + "type": "tag" + }, + { + "params": [ + "vnic" + ], + "type": "tag" + } + ], + "measurement": "vNicPerformance", + "orderByTime": "ASC", + "policy": "default", + "refId": "B", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "transmittedOctetsAccumulated" + ], + "type": "field" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Transmitted Octets", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "x-axis": true, + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "y-axis": true, + "y_formats": [ + "short", + "short" + ], + "yaxes": [ + { + "format": "short", + "label": "Octets/Packets", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + } + ], + "repeat": null, + "repeatIteration": null, + "repeatRowId": null, + "showTitle": false, + "title": "Dashboard Row", + "titleSize": "h6" + } + ], + "schemaVersion": 14, + "style": "dark", + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-30m", + "to": "now" + }, + "timepicker": { + "now": true, + "refresh_intervals": [ + "10s", + "20s", + "30s", + "1m" + ], + "time_options": [ + "5m", + "15m", + "1h", + "6h", + "12h", + "24h", + "2d", + "7d", + "30d" + ] + }, + "timezone": "browser", + "title": "VES Demo", + "version": 3 +} +} diff --git a/tools/monitor.py b/tools/monitor.py new file mode 100644 index 0000000..91c0eae --- /dev/null +++ b/tools/monitor.py @@ -0,0 +1,765 @@ +#!/usr/bin/env python +# +# Copyright 2016-2017 AT&T Intellectual Property, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# What this is: Monitor and closed-loop policy agent as part of the OPNFV VES +# ves_onap_demo. +# +# Status: this is a work in progress, under test. + +from rest_dispatcher import PathDispatcher, set_404_content +from wsgiref.simple_server import make_server +import sys +import os +import platform +import traceback +import time +from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter +import ConfigParser +import logging.handlers +from base64 import b64decode +import string +import json +import jsonschema +from functools import partial +import requests + +monitor_mode = "f" +vdu_id = ['','','','','',''] +summary_e = ['***** Summary of key stats *****','','',''] +summary_c = ['Collectd agents:'] +status = ['','Started','Started','Started'] +base_url = '' +template_404 = b'''POST {0}''' +columns = 0 +rows = 0 + +class JSONObject: + def __init__(self, d): + self.__dict__ = d + +__all__ = [] +__version__ = 0.1 +__date__ = '2015-12-04' +__updated__ = '2015-12-04' + +TESTRUN = False +DEBUG = False +PROFILE = False + +#------------------------------------------------------------------------------ +# Address of influxdb server. +#------------------------------------------------------------------------------ + +influxdb = '127.0.0.1' + +#------------------------------------------------------------------------------ +# Credentials we expect clients to authenticate themselves with. +#------------------------------------------------------------------------------ +vel_username = '' +vel_password = '' + +#------------------------------------------------------------------------------ +# The JSON schema which we will use to validate events. +#------------------------------------------------------------------------------ +vel_schema = None + +#------------------------------------------------------------------------------ +# The JSON schema which we will use to validate client throttle state. +#------------------------------------------------------------------------------ +throttle_schema = None + +#------------------------------------------------------------------------------ +# The JSON schema which we will use to provoke throttling commands for testing. +#------------------------------------------------------------------------------ +test_control_schema = None + +#------------------------------------------------------------------------------ +# Pending command list from the testControl API +# This is sent as a response commandList to the next received event. +#------------------------------------------------------------------------------ +pending_command_list = None + +#------------------------------------------------------------------------------ +# Logger for this module. +#------------------------------------------------------------------------------ +logger = None + +def listener(environ, start_response, schema): + ''' + Handler for the Vendor Event Listener REST API. + + Extract headers and the body and check that: + + 1) The client authenticated themselves correctly. + 2) The body validates against the provided schema for the API. + + ''' + logger.info('Got a Vendor Event request') + print('==== ' + time.asctime() + ' ' + '=' * 49) + + #-------------------------------------------------------------------------- + # Extract the content from the request. + #-------------------------------------------------------------------------- + length = int(environ.get('CONTENT_LENGTH', '0')) + logger.debug('Content Length: {0}'.format(length)) + body = environ['wsgi.input'].read(length) + logger.debug('Content Body: {0}'.format(body)) + + mode, b64_credentials = string.split(environ.get('HTTP_AUTHORIZATION', + 'None None')) + # logger.debug('Auth. Mode: {0} Credentials: {1}'.format(mode, + # b64_credentials)) + logger.debug('Auth. Mode: {0} Credentials: ****'.format(mode)) + if (b64_credentials != 'None'): + credentials = b64decode(b64_credentials) + else: + credentials = None + + # logger.debug('Credentials: {0}'.format(credentials)) + logger.debug('Credentials: ****') + + #-------------------------------------------------------------------------- + # If we have a schema file then check that the event matches that expected. + #-------------------------------------------------------------------------- + if (schema is not None): + logger.debug('Attempting to validate data: {0}\n' + 'Against schema: {1}'.format(body, schema)) + try: + decoded_body = json.loads(body) + jsonschema.validate(decoded_body, schema) + logger.info('Event is valid!') + print('Valid body decoded & checked against schema OK:\n' + '{0}'.format(json.dumps(decoded_body, + sort_keys=True, + indent=4, + separators=(',', ': ')))) + + except jsonschema.SchemaError as e: + logger.error('Schema is not valid! {0}'.format(e)) + print('Schema is not valid! {0}'.format(e)) + + except jsonschema.ValidationError as e: + logger.warn('Event is not valid against schema! {0}'.format(e)) + print('Event is not valid against schema! {0}'.format(e)) + print('Bad JSON body decoded:\n' + '{0}'.format(json.dumps(decoded_body, + sort_keys=True, + indent=4, + separators=(',', ': ')))) + + except Exception as e: + logger.error('Event invalid for unexpected reason! {0}'.format(e)) + print('Schema is not valid for unexpected reason! {0}'.format(e)) + else: + logger.debug('No schema so just decode JSON: {0}'.format(body)) + try: + decoded_body = json.loads(body) + print('Valid JSON body (no schema checking) decoded:\n' + '{0}'.format(json.dumps(decoded_body, + sort_keys=True, + indent=4, + separators=(',', ': ')))) + logger.info('Event is valid JSON but not checked against schema!') + + except Exception as e: + logger.error('Event invalid for unexpected reason! {0}'.format(e)) + print('JSON body not valid for unexpected reason! {0}'.format(e)) + + #-------------------------------------------------------------------------- + # See whether the user authenticated themselves correctly. + #-------------------------------------------------------------------------- + if (credentials == (vel_username + ':' + vel_password)): + logger.debug('Authenticated OK') +# print('Authenticated OK') + + #---------------------------------------------------------------------- + # Respond to the caller. If we have a pending commandList from the + # testControl API, send it in response. + #---------------------------------------------------------------------- + global pending_command_list + if pending_command_list is not None: + start_response('202 Accepted', + [('Content-type', 'application/json')]) + response = pending_command_list + pending_command_list = None + + print('\n'+ '='*80) + print('Sending pending commandList in the response:\n' + '{0}'.format(json.dumps(response, + sort_keys=True, + indent=4, + separators=(',', ': ')))) + print('='*80 + '\n') + yield json.dumps(response) + else: + start_response('202 Accepted', []) + yield '' + else: + logger.warn('Failed to authenticate OK; creds: ' + credentials) + print('Failed to authenticate agent credentials: ', credentials, + 'against expected ', vel_username, ':', vel_password) + + #---------------------------------------------------------------------- + # Respond to the caller. + #---------------------------------------------------------------------- + start_response('401 Unauthorized', [ ('Content-type', + 'application/json')]) + req_error = { 'requestError': { + 'policyException': { + 'messageId': 'POL0001', + 'text': 'Failed to authenticate' + } + } + } + yield json.dumps(req_error) + + save_event(body) + +#-------------------------------------------------------------------------- +# Send event to influxdb +#-------------------------------------------------------------------------- +def send_to_influxdb(event,pdata): + url = 'http://{}:8086/write?db=veseventsdb'.format(influxdb) + print('Send {} to influxdb at {}: {}'.format(event,influxdb,pdata)) + r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'}) + print('influxdb return code {}'.format(r.status_code)) + if r.status_code != 204: + print('*** Influxdb save failed, return code {} ***'.format(r.status_code)) + +#-------------------------------------------------------------------------- +# Save event data +#-------------------------------------------------------------------------- +def save_event(body): + jobj = json.loads(body) + e = json.loads(body, object_hook=JSONObject) + + domain = jobj['event']['commonEventHeader']['domain'] + timestamp = jobj['event']['commonEventHeader']['lastEpochMicrosec'] + agent = jobj['event']['commonEventHeader']['reportingEntityName'].upper( ) + if "LOCALHOST" in agent: + agent = "computehost" + source = jobj['event']['commonEventHeader']['sourceId'].upper( ) + + if e.event.commonEventHeader.domain == "heartbeat": + print('Found Heartbeat') + send_to_influxdb(event,'heartbeat,system={},sequence={}'.format(agent,e.event.commonEventHeader.sequence)) + + if 'measurementsForVfScalingFields' in jobj['event']: + print('Found measurementsForVfScalingFields') + +# "measurementsForVfScalingFields": { +# "additionalMeasurements": [ +# { +# "arrayOfFields": [ +# { +# "name": "load-longterm", +# "value": "0.34" +# }, +# { +# "name": "load-shortterm", +# "value": "0.32" +# }, +# { +# "name": "load-midterm", +# "value": "0.34" +# } +# ], +# "name": "load" +# } +# ], + + if 'additionalMeasurements' in jobj['event']['measurementsForVfScalingFields']: + for meas in jobj['event']['measurementsForVfScalingFields']['additionalMeasurements']: + name = meas['name'] + pdata = '{},system={}'.format(name,source) + for field in meas['arrayOfFields']: + pdata = pdata + ",{}={}".format(field['name'],field['value']) + i=pdata.find(',', pdata.find('system')) + pdata = pdata[:i] + ' ' + pdata[i+1:] + send_to_influxdb("systemLoad", pdata) + +# "memoryUsageArray": [ +# { +# "memoryBuffered": 269056.0, +# "memoryCached": 17636956.0, +# "memoryFree": 244731658240, +# "memorySlabRecl": 753160.0, +# "memorySlabUnrecl": 210800.0, +# "memoryUsed": 6240064.0, +# "vmIdentifier": "opnfv01" +# } +# ], + + if 'memoryUsageArray' in jobj['event']['measurementsForVfScalingFields']: + print('Found memoryUsageArray') + pdata = 'memoryUsage,system={}'.format(source) + vmid=e.event.measurementsForVfScalingFields.memoryUsageArray[0].vmIdentifier + d = jobj['event']['measurementsForVfScalingFields']['memoryUsageArray'][0].items() + for key,val in d: + if key != 'vmIdentifier': + pdata = pdata + ',{}={}'.format(key,val) + i=pdata.find(',', pdata.find('system')) + pdata = pdata[:i] + ' ' + pdata[i+1:] + send_to_influxdb("memoryUsage", pdata) + +# "vNicPerformanceArray": [ +# { +# "receivedDiscardedPacketsAccumulated": 0, +# "receivedErrorPacketsAccumulated": 0, +# "receivedOctetsAccumulated": 476.801524578, +# "receivedTotalPacketsAccumulated": 2.90000899705, +# "transmittedDiscardedPacketsAccumulated": 0, +# "transmittedErrorPacketsAccumulated": 0, +# "transmittedOctetsAccumulated": 230.100735749, +# "transmittedTotalPacketsAccumulated": 1.20000372292, +# "vNicIdentifier": "eno4", +# "valuesAreSuspect": "true" +# }, + + if 'vNicPerformanceArray' in jobj['event']['measurementsForVfScalingFields']: + print('Found vNicPerformanceArray') + for vnic in jobj['event']['measurementsForVfScalingFields']['vNicPerformanceArray']: + vnid=vnic['vNicIdentifier'] + pdata = 'vNicPerformance,system={},vnic={}'.format(vmid,vnid) + d = vnic.items() + for key,val in d: + if key != 'vNicIdentifier': + pdata = pdata + ',{}={}'.format(key,val) + i=pdata.find(',', pdata.find('vnic')) + pdata = pdata[:i] + ' ' + pdata[i+1:] + send_to_influxdb("vNicPerformance", pdata) + +def test_listener(environ, start_response, schema): + ''' + Handler for the Test Collector Test Control API. + + There is no authentication on this interface. + + This simply stores a commandList which will be sent in response to the next + incoming event on the EVEL interface. + ''' + global pending_command_list + logger.info('Got a Test Control input') + print('============================') + print('==== TEST CONTROL INPUT ====') + + #-------------------------------------------------------------------------- + # GET allows us to get the current pending request. + #-------------------------------------------------------------------------- + if environ.get('REQUEST_METHOD') == 'GET': + start_response('200 OK', [('Content-type', 'application/json')]) + yield json.dumps(pending_command_list) + return + + #-------------------------------------------------------------------------- + # Extract the content from the request. + #-------------------------------------------------------------------------- + length = int(environ.get('CONTENT_LENGTH', '0')) + logger.debug('TestControl Content Length: {0}'.format(length)) + body = environ['wsgi.input'].read(length) + logger.debug('TestControl Content Body: {0}'.format(body)) + + #-------------------------------------------------------------------------- + # If we have a schema file then check that the event matches that expected. + #-------------------------------------------------------------------------- + if (schema is not None): + logger.debug('Attempting to validate data: {0}\n' + 'Against schema: {1}'.format(body, schema)) + try: + decoded_body = json.loads(body) + jsonschema.validate(decoded_body, schema) + logger.info('TestControl is valid!') + print('TestControl:\n' + '{0}'.format(json.dumps(decoded_body, + sort_keys=True, + indent=4, + separators=(',', ': ')))) + + except jsonschema.SchemaError as e: + logger.error('TestControl Schema is not valid: {0}'.format(e)) + print('TestControl Schema is not valid: {0}'.format(e)) + + except jsonschema.ValidationError as e: + logger.warn('TestControl input not valid: {0}'.format(e)) + print('TestControl input not valid: {0}'.format(e)) + print('Bad JSON body decoded:\n' + '{0}'.format(json.dumps(decoded_body, + sort_keys=True, + indent=4, + separators=(',', ': ')))) + + except Exception as e: + logger.error('TestControl input not valid: {0}'.format(e)) + print('TestControl Schema not valid: {0}'.format(e)) + else: + logger.debug('Missing schema just decode JSON: {0}'.format(body)) + try: + decoded_body = json.loads(body) + print('Valid JSON body (no schema checking) decoded:\n' + '{0}'.format(json.dumps(decoded_body, + sort_keys=True, + indent=4, + separators=(',', ': ')))) + logger.info('TestControl input not checked against schema!') + + except Exception as e: + logger.error('TestControl input not valid: {0}'.format(e)) + print('TestControl input not valid: {0}'.format(e)) + + #-------------------------------------------------------------------------- + # Respond to the caller. If we received otherField 'ThrottleRequest', + # generate the appropriate canned response. + #-------------------------------------------------------------------------- + pending_command_list = decoded_body + print('===== TEST CONTROL END =====') + print('============================') + start_response('202 Accepted', []) + yield '' + +def main(argv=None): + ''' + Main function for the collector start-up. + + Called with command-line arguments: + * --config *<file>* + * --section *<section>* + * --verbose + + Where: + + *<file>* specifies the path to the configuration file. + + *<section>* specifies the section within that config file. + + *verbose* generates more information in the log files. + + The process listens for REST API invocations and checks them. Errors are + displayed to stdout and logged. + ''' + + if argv is None: + argv = sys.argv + else: + sys.argv.extend(argv) + + program_name = os.path.basename(sys.argv[0]) + program_version = 'v{0}'.format(__version__) + program_build_date = str(__updated__) + program_version_message = '%%(prog)s {0} ({1})'.format(program_version, + program_build_date) + if (__import__('__main__').__doc__ is not None): + program_shortdesc = __import__('__main__').__doc__.split('\n')[1] + else: + program_shortdesc = 'Running in test harness' + program_license = '''{0} + + Created on {1}. + Copyright 2015 Metaswitch Networks Ltd. All rights reserved. + + Distributed on an "AS IS" basis without warranties + or conditions of any kind, either express or implied. + +USAGE +'''.format(program_shortdesc, str(__date__)) + + try: + #---------------------------------------------------------------------- + # Setup argument parser so we can parse the command-line. + #---------------------------------------------------------------------- + parser = ArgumentParser(description=program_license, + formatter_class=ArgumentDefaultsHelpFormatter) + parser.add_argument('-i', '--influxdb', + dest='influxdb', + default='localhost', + help='InfluxDB server addresss') + parser.add_argument('-v', '--verbose', + dest='verbose', + action='count', + help='set verbosity level') + parser.add_argument('-V', '--version', + action='version', + version=program_version_message, + help='Display version information') + parser.add_argument('-a', '--api-version', + dest='api_version', + default='5', + help='set API version') + parser.add_argument('-c', '--config', + dest='config', + default='/etc/opt/att/collector.conf', + help='Use this config file.', + metavar='<file>') + parser.add_argument('-s', '--section', + dest='section', + default='default', + metavar='<section>', + help='section to use in the config file') + + #---------------------------------------------------------------------- + # Process arguments received. + #---------------------------------------------------------------------- + args = parser.parse_args() + verbose = args.verbose + api_version = args.api_version + config_file = args.config + config_section = args.section + + #---------------------------------------------------------------------- + # Now read the config file, using command-line supplied values as + # overrides. + #---------------------------------------------------------------------- + defaults = {'log_file': 'collector.log', + 'vel_port': '12233', + 'vel_path': '', + 'vel_topic_name': '' + } + overrides = {} + config = ConfigParser.SafeConfigParser(defaults) + config.read(config_file) + + #---------------------------------------------------------------------- + # extract the values we want. + #---------------------------------------------------------------------- + global influxdb + global vel_username + global vel_password + global vel_topic_name + influxdb = config.get(config_section, 'influxdb', vars=overrides) + log_file = config.get(config_section, 'log_file', vars=overrides) + vel_port = config.get(config_section, 'vel_port', vars=overrides) + vel_path = config.get(config_section, 'vel_path', vars=overrides) + + vel_topic_name = config.get(config_section, + 'vel_topic_name', + vars=overrides) + vel_username = config.get(config_section, + 'vel_username', + vars=overrides) + vel_password = config.get(config_section, + 'vel_password', + vars=overrides) + vel_schema_file = config.get(config_section, + 'schema_file', + vars=overrides) + base_schema_file = config.get(config_section, + 'base_schema_file', + vars=overrides) + throttle_schema_file = config.get(config_section, + 'throttle_schema_file', + vars=overrides) + test_control_schema_file = config.get(config_section, + 'test_control_schema_file', + vars=overrides) + + #---------------------------------------------------------------------- + # Finally we have enough info to start a proper flow trace. + #---------------------------------------------------------------------- + global logger + print('Logfile: {0}'.format(log_file)) + logger = logging.getLogger('monitor') + if verbose > 0: + print('Verbose mode on') + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + handler = logging.handlers.RotatingFileHandler(log_file, + maxBytes=1000000, + backupCount=10) + if (platform.system() == 'Windows'): + date_format = '%Y-%m-%d %H:%M:%S' + else: + date_format = '%Y-%m-%d %H:%M:%S.%f %z' + formatter = logging.Formatter('%(asctime)s %(name)s - ' + '%(levelname)s - %(message)s', + date_format) + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.info('Started') + + #---------------------------------------------------------------------- + # Log the details of the configuration. + #---------------------------------------------------------------------- + logger.debug('Log file = {0}'.format(log_file)) + logger.debug('Influxdb server = {0}'.format(influxdb)) + logger.debug('Event Listener Port = {0}'.format(vel_port)) + logger.debug('Event Listener Path = {0}'.format(vel_path)) + logger.debug('Event Listener Topic = {0}'.format(vel_topic_name)) + logger.debug('Event Listener Username = {0}'.format(vel_username)) + # logger.debug('Event Listener Password = {0}'.format(vel_password)) + logger.debug('Event Listener JSON Schema File = {0}'.format( + vel_schema_file)) + logger.debug('Base JSON Schema File = {0}'.format(base_schema_file)) + logger.debug('Throttle JSON Schema File = {0}'.format( + throttle_schema_file)) + logger.debug('Test Control JSON Schema File = {0}'.format( + test_control_schema_file)) + + #---------------------------------------------------------------------- + # Perform some basic error checking on the config. + #---------------------------------------------------------------------- + if (int(vel_port) < 1024 or int(vel_port) > 65535): + logger.error('Invalid Vendor Event Listener port ({0}) ' + 'specified'.format(vel_port)) + raise RuntimeError('Invalid Vendor Event Listener port ({0}) ' + 'specified'.format(vel_port)) + + if (len(vel_path) > 0 and vel_path[-1] != '/'): + logger.warning('Event Listener Path ({0}) should have terminating ' + '"/"! Adding one on to configured string.'.format( + vel_path)) + vel_path += '/' + + #---------------------------------------------------------------------- + # Load up the vel_schema, if it exists. + #---------------------------------------------------------------------- + if not os.path.exists(vel_schema_file): + logger.warning('Event Listener Schema File ({0}) not found. ' + 'No validation will be undertaken.'.format( + vel_schema_file)) + else: + global vel_schema + global throttle_schema + global test_control_schema + vel_schema = json.load(open(vel_schema_file, 'r')) + logger.debug('Loaded the JSON schema file') + + #------------------------------------------------------------------ + # Load up the throttle_schema, if it exists. + #------------------------------------------------------------------ + if (os.path.exists(throttle_schema_file)): + logger.debug('Loading throttle schema') + throttle_fragment = json.load(open(throttle_schema_file, 'r')) + throttle_schema = {} + throttle_schema.update(vel_schema) + throttle_schema.update(throttle_fragment) + logger.debug('Loaded the throttle schema') + + #------------------------------------------------------------------ + # Load up the test control _schema, if it exists. + #------------------------------------------------------------------ + if (os.path.exists(test_control_schema_file)): + logger.debug('Loading test control schema') + test_control_fragment = json.load( + open(test_control_schema_file, 'r')) + test_control_schema = {} + test_control_schema.update(vel_schema) + test_control_schema.update(test_control_fragment) + logger.debug('Loaded the test control schema') + + #------------------------------------------------------------------ + # Load up the base_schema, if it exists. + #------------------------------------------------------------------ + if (os.path.exists(base_schema_file)): + logger.debug('Updating the schema with base definition') + base_schema = json.load(open(base_schema_file, 'r')) + vel_schema.update(base_schema) + logger.debug('Updated the JSON schema file') + + #---------------------------------------------------------------------- + # We are now ready to get started with processing. Start-up the various + # components of the system in order: + # + # 1) Create the dispatcher. + # 2) Register the functions for the URLs of interest. + # 3) Run the webserver. + #---------------------------------------------------------------------- + root_url = '/{0}eventListener/v{1}{2}'.\ + format(vel_path, + api_version, + '/' + vel_topic_name + if len(vel_topic_name) > 0 + else '') + throttle_url = '/{0}eventListener/v{1}/clientThrottlingState'.\ + format(vel_path, api_version) + set_404_content(root_url) + dispatcher = PathDispatcher() + vendor_event_listener = partial(listener, schema = vel_schema) + dispatcher.register('GET', root_url, vendor_event_listener) + dispatcher.register('POST', root_url, vendor_event_listener) + vendor_throttle_listener = partial(listener, schema = throttle_schema) + dispatcher.register('GET', throttle_url, vendor_throttle_listener) + dispatcher.register('POST', throttle_url, vendor_throttle_listener) + + #---------------------------------------------------------------------- + # We also add a POST-only mechanism for test control, so that we can + # send commands to a single attached client. + #---------------------------------------------------------------------- + test_control_url = '/testControl/v{0}/commandList'.format(api_version) + test_control_listener = partial(test_listener, + schema = test_control_schema) + dispatcher.register('POST', test_control_url, test_control_listener) + dispatcher.register('GET', test_control_url, test_control_listener) + + httpd = make_server('', int(vel_port), dispatcher) + print('Serving on port {0}...'.format(vel_port)) + httpd.serve_forever() + + logger.error('Main loop exited unexpectedly!') + return 0 + + except KeyboardInterrupt: + #---------------------------------------------------------------------- + # handle keyboard interrupt + #---------------------------------------------------------------------- + logger.info('Exiting on keyboard interrupt!') + return 0 + + except Exception as e: + #---------------------------------------------------------------------- + # Handle unexpected exceptions. + #---------------------------------------------------------------------- + if DEBUG or TESTRUN: + raise(e) + indent = len(program_name) * ' ' + sys.stderr.write(program_name + ': ' + repr(e) + '\n') + sys.stderr.write(indent + ' for help use --help\n') + sys.stderr.write(traceback.format_exc()) + logger.critical('Exiting because of exception: {0}'.format(e)) + logger.critical(traceback.format_exc()) + return 2 + +#------------------------------------------------------------------------------ +# MAIN SCRIPT ENTRY POINT. +#------------------------------------------------------------------------------ +if __name__ == '__main__': + if TESTRUN: + #---------------------------------------------------------------------- + # Running tests - note that doctest comments haven't been included so + # this is a hook for future improvements. + #---------------------------------------------------------------------- + import doctest + doctest.testmod() + + if PROFILE: + #---------------------------------------------------------------------- + # Profiling performance. Performance isn't expected to be a major + # issue, but this should all work as expected. + #---------------------------------------------------------------------- + import cProfile + import pstats + profile_filename = 'collector_profile.txt' + cProfile.run('main()', profile_filename) + statsfile = open('collector_profile_stats.txt', 'wb') + p = pstats.Stats(profile_filename, stream=statsfile) + stats = p.strip_dirs().sort_stats('cumulative') + stats.print_stats() + statsfile.close() + sys.exit(0) + + #-------------------------------------------------------------------------- + # Normal operation - call through to the main function. + #-------------------------------------------------------------------------- + sys.exit(main()) diff --git a/tools/ves-setup.sh b/tools/ves-setup.sh index 65dd613..b37662b 100644 --- a/tools/ves-setup.sh +++ b/tools/ves-setup.sh @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -#. What this is: Setup script for VES agent framework. +#. What this is: Setup script for the VES monitoring framework. #. With this script VES support can be installed in one or more hosts, with: #. - a dedicated or shared Kafka server for collection of events from collectd #. - VES collectd agents running in host or guest mode @@ -23,6 +23,7 @@ #. pre-installed VES collector e.g. from the ONAP project. #. - Install Kafka server on one of the hosts, or use a pre-installed server #. accessible from the agent hosts. +#. - Install collectd on each host. #. - Install the VES agent on each host. #. - As needed, install the VES agent on each virtual host. This could include #. pre-installed VES agents in VM or container images, which are configured @@ -47,21 +48,13 @@ #. ves_kafka_host: kafka host IP or hostname (default: 127.0.0.1) #. #. Usage: -#. wget https://raw.githubusercontent.com/opnfv/ves/master/tools/ves-setup.sh -#. bash ves-setup.sh <collector|kafka|agent> +#. git clone https://gerrit.opnfv.org/gerrit/ves /tmp/ves +#. bash /tmp/ves/ves-setup.sh <collector|kafka|collectd|agent> #. collector: setup VES collector (test collector) #. kafka: setup kafka server for VES events from collect agent(s) -#. agent: setup VES agent in host or guest mode -#. Recommended sequence is: -#. ssh into your collector host and run these commands: -#. $ ves_host=$(ip route get 8.8.8.8 | awk '{print $NF; exit}') -#. $ export ves_host -#. $ bash ves-setup.sh collector -#. ...then for each agent host: -#. copy ~/ves_env.sh and ves-setup.sh to the host e.g. via scp -#. ssh into the host and run, directly or via ssh -x -#. $ bash ves-setup.sh kafka -#. $ bash ves-setup.sh agent +#. collectd: setup collectd with libvirt plugin, as a kafka publisher +#. agent: setup VES agent in host or guest mode, as a kafka consumer +#. See demo_deploy.sh in this repo for a recommended sequence of the above. #. #. Status: this is a work in progress, under test. @@ -81,18 +74,27 @@ function log() { function common_prereqs() { log "install common prerequisites" - sudo apt-get install -y default-jre - sudo apt-get install -y zookeeperd - sudo apt-get install -y python-pip - sudo pip install kafka-python + if [[ ! -f /.dockerenv ]]; then dosudo="sudo"; fi + $dosudo apt-get update + $dosudo apt-get install -y git + # Required for kafka + $dosudo apt-get install -y default-jre + $dosudo apt-get install -y zookeeperd + $dosudo apt-get install -y python-pip + $dosudo pip install kafka-python + # Required for building collectd + $dosudo apt-get install -y pkg-config } function setup_env() { - if [[ ! -f ~/ves_env.sh ]]; then - cat <<EOF >~/ves_env.sh + if [[ ! -d /tmp/ves ]]; then mkdir /tmp/ves; fi + cp $0 /tmp/ves + if [[ ! -f /tmp/ves/ves_env.sh ]]; then + cat <<EOF >/tmp/ves/ves_env.sh #!/bin/bash ves_mode="${ves_mode:=host}" ves_host="${ves_host:=127.0.0.1}" +ves_hostname="${ves_hostname:=localhost}" ves_port="${ves_port:=30000}" ves_path="${ves_path:=}" ves_topic="${ves_topic:=}" @@ -105,6 +107,7 @@ ves_kafka_host="${ves_kafka_host:=127.0.0.1}" ves_kafka_port="${ves_kafka_port:=9092}" export ves_mode export ves_host +export ves_hostname export ves_port export ves_path export ves_topic @@ -112,57 +115,62 @@ export ves_https export ves_user export ves_pass export ves_interval -export ves_kafka_host -export ves_port +export ves_kafka_hostame export ves_kafka_port EOF fi - source ~/ves_env.sh + source /tmp/ves/ves_env.sh } function setup_kafka() { log "setup kafka server" common_prereqs + setup_env - log "get and unpack kafka_2.11-0.11.0.0.tgz" - wget "http://www-eu.apache.org/dist/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz" - tar -xvzf kafka_2.11-0.11.0.0.tgz + cd + ver="0.11.0.2" + log "get and unpack kafka_2.11-$ver.tgz" + wget "http://www-eu.apache.org/dist/kafka/$ver/kafka_2.11-$ver.tgz" + tar -xvzf kafka_2.11-$ver.tgz log "set delete.topic.enable=true" sed -i -- 's/#delete.topic.enable=true/delete.topic.enable=true/' \ - kafka_2.11-0.11.0.0/config/server.properties - grep delete.topic.enable kafka_2.11-0.11.0.0/config/server.properties + kafka_2.11-$ver/config/server.properties + grep delete.topic.enable kafka_2.11-$ver/config/server.properties # TODO: Barometer VES guide to clarify hostname must be in /etc/hosts - sudo nohup kafka_2.11-0.11.0.0/bin/kafka-server-start.sh \ - kafka_2.11-0.11.0.0/config/server.properties \ - > kafka_2.11-0.11.0.0/kafka.log 2>&1 & - # TODO: find a test that does not hang the script at - # echo "Hello, World" | ~/kafka_2.11-0.11.0.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TopicTest > /dev/null - # ~/kafka_2.11-0.11.0.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic TopicTest --from-beginning + sudo nohup kafka_2.11-$ver/bin/kafka-server-start.sh \ + kafka_2.11-$ver/config/server.properties \ + > kafka_2.11-$ver/kafka.log 2>&1 & } -function setup_agent() { - log "setup agent" - common_prereqs - - log "cleanup any previous failed install" - sudo rm -rf ~/collectd-virt - sudo rm -rf ~/librdkafka - sudo rm -rf ~/collectd - +function setup_kafka_client() { log "Install Apache Kafka C/C++ client library" - sudo apt-get install -y build-essential + if [[ ! -f /.dockerenv ]]; then dosudo="sudo"; fi + $dosudo apt-get install -y build-essential git clone https://github.com/edenhill/librdkafka.git ~/librdkafka cd ~/librdkafka git checkout -b v0.9.5 v0.9.5 # TODO: Barometer VES guide to clarify specific prerequisites for Ubuntu - sudo apt-get install -y libpthread-stubs0-dev - sudo apt-get install -y libssl-dev - sudo apt-get install -y libsasl2-dev - sudo apt-get install -y liblz4-dev + $dosudo apt-get install -y libpthread-stubs0-dev + $dosudo apt-get install -y libssl-dev + $dosudo apt-get install -y libsasl2-dev + $dosudo apt-get install -y liblz4-dev ./configure --prefix=/usr make - sudo make install + $dosudo make install +} + +function setup_collectd() { + log "setup collectd" + common_prereqs + setup_env + + log "cleanup any previous failed install" + sudo rm -rf ~/collectd-virt + sudo rm -rf ~/librdkafka + sudo rm -rf ~/collectd + + setup_kafka_client log "Build collectd with Kafka support" git clone https://github.com/collectd/collectd.git ~/collectd @@ -194,29 +202,6 @@ function setup_agent() { sudo systemctl daemon-reload sudo systemctl start collectd.service - log "install VES agent prerequisites" - sudo pip install pyyaml - - log "clone OPNFV Barometer" - git clone https://gerrit.opnfv.org/gerrit/barometer ~/barometer - - log "setup ves_app_config.conf" - cd ~/barometer/3rd_party/collectd-ves-app/ves_app - cat <<EOF >ves_app_config.conf -[config] -Domain = $ves_host -Port = $ves_port -Path = $ves_path -Topic = $ves_topic -UseHttps = $ves_https -Username = $ves_user -Password = $ves_pass -SendEventInterval = $ves_interval -ApiVersion = $ves_version -KafkaPort = $ves_kafka_port -KafkaBroker = $ves_kafka_host -EOF - log "setup VES collectd config for VES $ves_mode mode" if [[ "$ves_mode" == "host" ]]; then # TODO: Barometer VES guide to clarify prerequisites install for Ubuntu @@ -228,19 +213,16 @@ EOF # http://docs.opnfv.org/en/latest/submodules/barometer/docs/release/userguide/feature.userguide.html#virt-plugin sudo systemctl start libvirtd - git clone https://github.com/maryamtahhan/collectd ~/collectd-virt - cd ~/collectd-virt + rm -rf /tmp/ves/collectd-virt + git clone https://github.com/maryamtahhan/collectd /tmp/ves/collectd-virt + cd /tmp/ves/collectd-virt ./build.sh ./configure --enable-syslog --enable-logfile --enable-debug make sudo make install - - # TODO: Barometer VES guide refers to "At least one VM instance should be - # up and running by hypervisor on the host." The process needs to accomodate - # pre-installation of the VES agent *prior* to the first VM being created. - cat <<EOF | sudo tee -a /opt/collectd/etc/collectd.conf +# for VES plugin LoadPlugin logfile <Plugin logfile> LogLevel info @@ -270,6 +252,7 @@ LoadPlugin write_kafka EOF else cat <<EOF | sudo tee -a /opt/collectd/etc/collectd.conf +# for VES plugin LoadPlugin logfile <Plugin logfile> LogLevel info @@ -280,15 +263,6 @@ LoadPlugin logfile LoadPlugin cpu -#LoadPlugin virt -#<Plugin virt> -# Connection "qemu:///system" -# RefreshInterval 60 -# HostnameFormat uuid -# PluginInstanceFormat name -# ExtraStats "cpu_util" -#</Plugin> - LoadPlugin write_kafka <Plugin write_kafka> Property "metadata.broker.list" "$ves_kafka_host:$ves_kafka_port" @@ -299,14 +273,61 @@ LoadPlugin write_kafka EOF fi + if [[ $(grep -c $ves_hostname /etc/hosts) -eq 0 ]]; then + log "add to /etc/hosts: $ves_kafka_host $ves_hostname" + echo "$ves_kafka_host $ves_hostname" | sudo tee -a /etc/hosts + log "restart collectd to apply updated config" sudo systemctl restart collectd +} - log "start VES agent" - cd ~/barometer/3rd_party/collectd-ves-app/ves_app - nohup python ves_app.py \ - --events-schema=$ves_mode.yaml \ - --config=ves_app_config.conf > ~/ves_app.stdout.log 2>&1 & +function setup_agent() { + log "setup VES agent" + if [[ ! -f /.dockerenv ]]; then + log "start the ves-agent container" + sudo docker run -it -d -v /tmp/ves:/opt/ves --name=ves-agent \ + ubuntu:xenial /bin/bash + log "execute the agent setup script in the container" + sudo docker exec ves-agent /bin/bash /opt/ves/ves-setup.sh agent + else + common_prereqs + log "setup the VES environment" + source /opt/ves/ves_env.sh + log "install agent prerequisites" + pip install pyaml + + setup_kafka_client + + log "clone OPNFV Barometer" + rm -rf /opt/ves/barometer + git clone https://gerrit.opnfv.org/gerrit/barometer /opt/ves/barometer + + log "setup ves_app_config.conf" + source /opt/ves/ves_env.sh + cd /opt/ves/barometer/3rd_party/collectd-ves-app/ves_app + cat <<EOF >ves_app_config.conf +[config] +Domain = $ves_host +Port = $ves_port +Path = $ves_path +Topic = $ves_topic +UseHttps = $ves_https +Username = $ves_user +Password = $ves_pass +SendEventInterval = $ves_interval +ApiVersion = $ves_version +KafkaPort = $ves_kafka_port +KafkaBroker = $ves_kafka_host +EOF + +# log "add guest.yaml measurements to host.yaml (enables actual host data)" +# tail --lines=+24 guest.yaml >>host.yaml + + log "start VES agent" + echo "$ves_kafka_host $ves_hostname">>/etc/hosts + nohup python ves_app.py --events-schema=$ves_mode.yaml --loglevel ERROR \ + --config=ves_app_config.conf > /opt/ves/ves_app.stdout.log 2>&1 & + fi } function setup_collector() { @@ -314,29 +335,19 @@ function setup_collector() { log "install prerequistes" sudo apt-get install -y jq + ves_hostname=$HOSTNAME + export ves_hostname ves_host=$(ip route get 8.8.8.8 | awk '{print $NF; exit}') export ves_host setup_env - echo "cleanup any earlier install attempts" - sudo docker stop influxdb - sudo docker rm influxdb - sudo docker stop grafana - sudo docker rm grafana - sudo docker stop ves-collector - sudo docker rm -v ves-collector - sudo rm -rf /tmp/ves - - log "clone OPNFV VES" - git clone https://gerrit.opnfv.org/gerrit/ves /tmp/ves - log "setup influxdb container" - sudo docker run -d --name=influxdb -p 8086:8086 influxdb - status=$(sudo docker inspect influxdb | jq -r '.[0].State.Status') + sudo docker run -d --name=ves-influxdb -p 8086:8086 influxdb + status=$(sudo docker inspect ves-influxdb | jq -r '.[0].State.Status') while [[ "x$status" != "xrunning" ]]; do log "InfluxDB container state is ($status)" sleep 10 - status=$(sudo docker inspect influxdb | jq -r '.[0].State.Status') + status=$(sudo docker inspect ves-influxdb | jq -r '.[0].State.Status') done log "InfluxDB container state is $status" @@ -351,23 +362,23 @@ function setup_collector() { --data-urlencode "q=CREATE DATABASE veseventsdb" log "install Grafana container" - sudo docker run -d --name grafana -p 3000:3000 grafana/grafana - status=$(sudo docker inspect grafana | jq -r '.[0].State.Status') + sudo docker run -d --name ves-grafana -p 3001:3000 grafana/grafana + status=$(sudo docker inspect ves-grafana | jq -r '.[0].State.Status') while [[ "x$status" != "xrunning" ]]; do log "Grafana container state is ($status)" sleep 10 - status=$(sudo docker inspect grafana | jq -r '.[0].State.Status') + status=$(sudo docker inspect ves-grafana | jq -r '.[0].State.Status') done log "Grafana container state is $status" log "wait for Grafana API to be active" - while ! curl http://$ves_host:3000 ; do + while ! curl http://$ves_host:3001 ; do log "Grafana API is not yet responding... waiting 10 seconds" sleep 10 done log "add VESEvents datasource to Grafana" - cat <<EOF >datasource.json + cat <<EOF >/tmp/ves/datasource.json { "name":"VESEvents", "type":"influxdb", "access":"direct", @@ -385,18 +396,19 @@ function setup_collector() { EOF curl -H "Accept: application/json" -H "Content-type: application/json" \ - -X POST -d @datasource.json \ - http://admin:admin@$ves_host:3000/api/datasources + -X POST -d @/tmp/ves/datasource.json \ + http://admin:admin@$ves_host:3001/api/datasources log "add VES dashboard to Grafana" curl -H "Accept: application/json" -H "Content-type: application/json" \ -X POST \ - -d @/tmp/ves/tests/onap-demo/blueprints/tosca-vnfd-onap-demo/Dashboard.json\ - http://admin:admin@$ves_host:3000/api/dashboards/db + -d @/tmp/ves/tools/grafana/Dashboard.json\ + http://admin:admin@$ves_host:3001/api/dashboards/db log "setup collector container" cd /tmp/ves touch monitor.log + rm -rf /tmp/ves/evel-test-collector git clone https://github.com/att/evel-test-collector.git sed -i -- \ "s~log_file = /var/log/att/collector.log~log_file = /opt/ves/collector.log~" \ @@ -411,8 +423,10 @@ EOF evel-test-collector/config/collector.conf sed -i -- "s~vel_topic_name = example_vnf~vel_topic_name = $ves_topic~g" \ evel-test-collector/config/collector.conf + sed -i -- "/vel_topic_name = /a influxdb = $ves_host" \ + evel-test-collector/config/collector.conf - cp tests/onap-demo/blueprints/tosca-vnfd-onap-demo/monitor.py \ + cp /tmp/ves/tools/monitor.py \ evel-test-collector/code/collector/monitor.py # Note below: python (2.7) is required due to dependency on module 'ConfigParser' @@ -424,28 +438,46 @@ pip install requests python /opt/ves/evel-test-collector/code/collector/monitor.py \ --config /opt/ves/evel-test-collector/config/collector.conf \ --influxdb $ves_host \ ---section default > /opt/ves/monitor.log +--section default > /opt/ves/monitor.log 2>&1 & EOF sudo docker run -it -d -v /tmp/ves:/opt/ves --name=ves-collector \ -p 30000:30000 ubuntu:xenial /bin/bash - sudo docker exec -it -d ves-collector bash /opt/ves/setup-collector.sh + sudo docker exec ves-collector /bin/bash /opt/ves/setup-collector.sh # debug hints # sudo docker exec -it ves-collector apt-get install -y tcpdump # sudo docker exec -it ves-collector tcpdump -A -v -s 0 -i any port 30000 # curl http://$ves_host:30000 # sudo docker exec -it ves-collector /bin/bash - # ~/kafka_2.11-0.11.0.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic collectd + # ~/kafka_2.11-0.11.0.2/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic collectd +} + +function clean() { + log "clean installation for $1 at $2" + if [[ "$1" == "master" ]]; then + cs="ves-agent ves-collector ves-grafana ves-influxdb" + for c in $cs; do + log "stop and remove container $c" + sudo docker stop $c + sudo docker rm -v $c + done + fi + log "remove collectd config for VES" + sudo sed -i -- '/VES plugin/,$d' /opt/collectd/etc/collectd.conf + sudo systemctl restart collectd + sudo rm -rf /tmp/ves } dist=`grep DISTRIB_ID /etc/*-release | awk -F '=' '{print $2}'` -setup_env if [[ $(grep -c $HOSTNAME /etc/hosts) -eq 0 ]]; then echo "$(ip route get 8.8.8.8 | awk '{print $NF; exit}') $HOSTNAME" |\ sudo tee -a /etc/hosts fi case "$1" in + "collectd") + setup_collectd + ;; "agent") setup_agent ;; @@ -455,6 +487,9 @@ case "$1" in "kafka") setup_kafka ;; + "clean") + clean $2 $3 + ;; *) grep '#. ' $0 esac |