summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Sullivan <bryan.sullivan@att.com>2017-11-15 16:27:17 -0800
committerBryan Sullivan <bryan.sullivan@att.com>2017-11-19 20:48:54 -0800
commit4b0b335d54946cbb202dfdf0545d499cf559faaa (patch)
tree2fa112f525c071a08881e0ec6bee7c3b6c1ea674
parentc40be26f0586fa931c986e2eea40477c524d381d (diff)
Updated monitor.py and grafana dashboard
JIRA: VES-2 Various fixes in ves-setup Add demo_deploy.sh Refactored to containerized agent and collector Change-Id: I8851465742aaf40a4cce265508a3d2d66abced08 Signed-off-by: Bryan Sullivan <bryan.sullivan@att.com>
-rw-r--r--tools/demo_deploy.sh75
-rw-r--r--tools/grafana/Dashboard.json763
-rw-r--r--tools/monitor.py765
-rw-r--r--tools/ves-setup.sh279
4 files changed, 1760 insertions, 122 deletions
diff --git a/tools/demo_deploy.sh b/tools/demo_deploy.sh
new file mode 100644
index 0000000..b0b76e8
--- /dev/null
+++ b/tools/demo_deploy.sh
@@ -0,0 +1,75 @@
+#!/bin/bash
+# Copyright 2017 AT&T Intellectual Property, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#. What this is: Complete scripted deployment of the VES monitoring framework
+# When complete, the following will be installed:
+#. - On the specified master node, a Kafka server and containers running the
+# OPNFV Barometer VES agent, OPNFV VES collector, InfluxDB, and Grafana
+#. - On each specified worker node, collectd configured per OPNFV Barometer
+#.
+#. Prerequisites:
+#. - Ubuntu server for kubernetes cluster nodes (master and worker nodes)
+#. - MAAS server as cluster admin for kubernetes master/worker nodes
+#. - Password-less ssh key provided for node setup
+#. - hostname of kubernetes master setup in DNS or /etc/hosts
+#. Usage: on the MAAS server
+#. $ git clone https://gerrit.opnfv.org/gerrit/ves ~/ves
+#. $ bash ~/ves/tools/demo_deploy.sh master <node> <key>
+#. master: setup VES on k8s master
+#. <node>: IP of cluster master node
+#. <key>: SSH key enabling password-less SSH to nodes
+#. $ bash ~/ves/tools/demo_deploy.sh worker <node> <key>
+#. worker: setup VES on k8s worker
+#. <node>: IP of worker node
+#. <key>: SSH key enabling password-less SSH to nodes
+
+node=$2
+key=$3
+
+eval `ssh-agent`
+ssh-add $key
+if [[ "$1" == "master" ]]; then
+ echo; echo "$0 $(date): Setting up master node"
+ scp -r -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no \
+ ~/ves ubuntu@$node:/tmp
+ ssh -x -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no ubuntu@$node <<EOF
+ ves_host="$master"
+ export ves_host
+ ves_mode="guest"
+ export ves_mode
+ ves_user="hello"
+ export ves_user
+ ves_pass="world"
+ export ves_pass
+ ves_kafka_host="$node"
+ export ves_kafka_host
+ bash /tmp/ves/tools/ves-setup.sh collector
+ bash /tmp/ves/tools/ves-setup.sh kafka
+ bash /tmp/ves/tools/ves-setup.sh collectd
+ bash /tmp/ves/tools/ves-setup.sh agent
+EOF
+ mkdir /tmp/ves
+ scp -r -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no \
+ ubuntu@$node:/tmp/ves/ves_env.sh /tmp/ves/.
+ echo "VES Grafana dashboards are available at http://$node:3001 (login as admin/admin)"
+else
+ echo; echo "$0 $(date): Setting up collectd at $node"
+ scp -r -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no \
+ ~/ves ubuntu@$node:/tmp
+ scp -r -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no \
+ /tmp/ves/ves_env.sh ubuntu@$node:/tmp/ves/.
+ ssh -x -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no \
+ ubuntu@$node bash /tmp/ves/tools/ves-setup.sh collectd
+fi
diff --git a/tools/grafana/Dashboard.json b/tools/grafana/Dashboard.json
new file mode 100644
index 0000000..d59e5da
--- /dev/null
+++ b/tools/grafana/Dashboard.json
@@ -0,0 +1,763 @@
+{
+"dashboard": {
+ "editable": true,
+ "gnetId": null,
+ "graphTooltip": 0,
+ "hideControls": false,
+ "id": null,
+ "links": [],
+ "refresh": "10s",
+ "rows": [
+ {
+ "collapse": false,
+ "height": 273,
+ "panels": [
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "VESEvents",
+ "fill": 1,
+ "grid": {
+ "leftLogBase": 1,
+ "leftMax": null,
+ "leftMin": null,
+ "rightLogBase": 1,
+ "rightMax": null,
+ "rightMin": null
+ },
+ "id": 3,
+ "interval": "30s",
+ "legend": {
+ "alignAsTable": false,
+ "avg": true,
+ "current": true,
+ "max": true,
+ "min": true,
+ "rightSide": false,
+ "show": true,
+ "total": false,
+ "values": true
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "span": 6,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "alias": "CpuUser",
+ "dsType": "influxdb",
+ "groupBy": [
+ {
+ "params": [
+ "system"
+ ],
+ "type": "tag"
+ }
+ ],
+ "measurement": "load",
+ "orderByTime": "ASC",
+ "policy": "default",
+ "query": "SELECT mean(\"cpuuser\") FROM \"cpu\" WHERE \"system\" = 'computehost' AND $timeFilter GROUP BY time(1m) fill(null)",
+ "refId": "A",
+ "resultFormat": "time_series",
+ "select": [
+ [
+ {
+ "params": [
+ "load-longterm"
+ ],
+ "type": "field"
+ }
+ ]
+ ],
+ "tags": []
+ },
+ {
+ "alias": "CpuSystem",
+ "dsType": "influxdb",
+ "groupBy": [
+ {
+ "params": [
+ "system"
+ ],
+ "type": "tag"
+ }
+ ],
+ "measurement": "load",
+ "orderByTime": "ASC",
+ "policy": "default",
+ "query": "SELECT mean(\"cpusystem\") FROM \"cpu\" WHERE $timeFilter GROUP BY time(1m) fill(null)",
+ "refId": "B",
+ "resultFormat": "time_series",
+ "select": [
+ [
+ {
+ "params": [
+ "load-shortterm"
+ ],
+ "type": "field"
+ }
+ ]
+ ],
+ "tags": [
+ {
+ "key": "system",
+ "operator": "=",
+ "value": "OPNFV01"
+ }
+ ]
+ },
+ {
+ "alias": "CpuIdle",
+ "dsType": "influxdb",
+ "groupBy": [
+ {
+ "params": [
+ "system"
+ ],
+ "type": "tag"
+ }
+ ],
+ "measurement": "load",
+ "orderByTime": "ASC",
+ "policy": "default",
+ "query": "SELECT mean(\"cpuidle\") FROM \"cpu\" WHERE $timeFilter GROUP BY time(1m) fill(null)",
+ "refId": "C",
+ "resultFormat": "time_series",
+ "select": [
+ [
+ {
+ "params": [
+ "load-midterm"
+ ],
+ "type": "field"
+ }
+ ]
+ ],
+ "tags": [
+ {
+ "key": "system",
+ "operator": "=",
+ "value": "OPNFV01"
+ }
+ ]
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "host load",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "x-axis": true,
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "y-axis": true,
+ "y_formats": [
+ "short",
+ "short"
+ ],
+ "yaxes": [
+ {
+ "format": "short",
+ "label": "Percent",
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ]
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "VESEvents",
+ "fill": 1,
+ "id": 5,
+ "legend": {
+ "avg": false,
+ "current": false,
+ "max": false,
+ "min": false,
+ "show": true,
+ "total": false,
+ "values": false
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "span": 6,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "alias": "Free",
+ "dsType": "influxdb",
+ "groupBy": [
+ {
+ "params": [
+ "system"
+ ],
+ "type": "tag"
+ }
+ ],
+ "measurement": "memoryUsage",
+ "orderByTime": "ASC",
+ "policy": "autogen",
+ "refId": "A",
+ "resultFormat": "time_series",
+ "select": [
+ [
+ {
+ "params": [
+ "memoryFree"
+ ],
+ "type": "field"
+ }
+ ]
+ ],
+ "tags": []
+ },
+ {
+ "alias": "Used",
+ "dsType": "influxdb",
+ "groupBy": [
+ {
+ "params": [
+ "system"
+ ],
+ "type": "tag"
+ }
+ ],
+ "measurement": "memoryUsage",
+ "orderByTime": "ASC",
+ "policy": "autogen",
+ "refId": "B",
+ "resultFormat": "time_series",
+ "select": [
+ [
+ {
+ "params": [
+ "memoryUsed"
+ ],
+ "type": "field"
+ }
+ ]
+ ],
+ "tags": []
+ },
+ {
+ "alias": "Buffered",
+ "dsType": "influxdb",
+ "groupBy": [
+ {
+ "params": [
+ "system"
+ ],
+ "type": "tag"
+ }
+ ],
+ "measurement": "memoryUsage",
+ "orderByTime": "ASC",
+ "policy": "autogen",
+ "refId": "C",
+ "resultFormat": "time_series",
+ "select": [
+ [
+ {
+ "params": [
+ "memoryBuffered"
+ ],
+ "type": "field"
+ }
+ ]
+ ],
+ "tags": []
+ },
+ {
+ "alias": "Cached",
+ "dsType": "influxdb",
+ "groupBy": [
+ {
+ "params": [
+ "system"
+ ],
+ "type": "tag"
+ }
+ ],
+ "measurement": "memoryUsage",
+ "orderByTime": "ASC",
+ "policy": "autogen",
+ "refId": "D",
+ "resultFormat": "time_series",
+ "select": [
+ [
+ {
+ "params": [
+ "memoryCached"
+ ],
+ "type": "field"
+ }
+ ]
+ ],
+ "tags": []
+ },
+ {
+ "alias": "SlabRecl",
+ "dsType": "influxdb",
+ "groupBy": [
+ {
+ "params": [
+ "system"
+ ],
+ "type": "tag"
+ }
+ ],
+ "measurement": "memoryUsage",
+ "orderByTime": "ASC",
+ "policy": "autogen",
+ "refId": "E",
+ "resultFormat": "time_series",
+ "select": [
+ [
+ {
+ "params": [
+ "memorySlabRecl"
+ ],
+ "type": "field"
+ }
+ ]
+ ],
+ "tags": []
+ },
+ {
+ "alias": "SlabUnrecl",
+ "dsType": "influxdb",
+ "groupBy": [
+ {
+ "params": [
+ "system"
+ ],
+ "type": "tag"
+ }
+ ],
+ "measurement": "memoryUsage",
+ "orderByTime": "ASC",
+ "policy": "autogen",
+ "refId": "F",
+ "resultFormat": "time_series",
+ "select": [
+ [
+ {
+ "params": [
+ "memorySlabUnrecl"
+ ],
+ "type": "field"
+ }
+ ]
+ ],
+ "tags": []
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Memory",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 10,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ]
+ }
+ ],
+ "repeat": null,
+ "repeatIteration": null,
+ "repeatRowId": null,
+ "showTitle": false,
+ "title": "Dashboard Row",
+ "titleSize": "h6"
+ },
+ {
+ "collapse": false,
+ "height": 250,
+ "panels": [
+ {
+ "alert": {
+ "conditions": [
+ {
+ "evaluator": {
+ "params": [
+ 15000000
+ ],
+ "type": "gt"
+ },
+ "operator": {
+ "type": "and"
+ },
+ "query": {
+ "params": [
+ "A",
+ "1s",
+ "now"
+ ]
+ },
+ "reducer": {
+ "params": [],
+ "type": "avg"
+ },
+ "type": "query"
+ }
+ ],
+ "executionErrorState": "alerting",
+ "frequency": "1s",
+ "handler": 1,
+ "message": "Transmitted Traffic Exceeded limits\nClosed Loop Action:Apply Firewall Rules",
+ "name": "VES webserver_1 Network Usage alert",
+ "noDataState": "no_data",
+ "notifications": []
+ },
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "VESEvents",
+ "fill": 1,
+ "grid": {
+ "leftLogBase": 1,
+ "leftMax": null,
+ "leftMin": null,
+ "rightLogBase": 1,
+ "rightMax": null,
+ "rightMin": null
+ },
+ "id": 2,
+ "interval": "30s",
+ "legend": {
+ "alignAsTable": false,
+ "avg": true,
+ "current": false,
+ "max": false,
+ "min": false,
+ "rightSide": false,
+ "show": true,
+ "total": false,
+ "values": true
+ },
+ "lines": true,
+ "linewidth": 2,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "span": 6,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "alias": "rxOctets",
+ "dsType": "influxdb",
+ "groupBy": [
+ {
+ "params": [
+ "system"
+ ],
+ "type": "tag"
+ },
+ {
+ "params": [
+ "vnic"
+ ],
+ "type": "tag"
+ }
+ ],
+ "measurement": "vNicPerformance",
+ "orderByTime": "ASC",
+ "policy": "default",
+ "query": "SELECT derivative(mean(\"rxoctetsacc\"), 10s) FROM \"vnic\" WHERE \"system\" = 'computehost' AND $timeFilter GROUP BY time(1m) fill(null)",
+ "refId": "A",
+ "resultFormat": "time_series",
+ "select": [
+ [
+ {
+ "params": [
+ "receivedOctetsAccumulated"
+ ],
+ "type": "field"
+ }
+ ]
+ ],
+ "tags": []
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Received Octets",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "x-axis": true,
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "y-axis": true,
+ "y_formats": [
+ "short",
+ "short"
+ ],
+ "yaxes": [
+ {
+ "format": "short",
+ "label": "Octets/Packets",
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ]
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "VESEvents",
+ "fill": 1,
+ "grid": {
+ "leftLogBase": 1,
+ "leftMax": null,
+ "leftMin": null,
+ "rightLogBase": 1,
+ "rightMax": null,
+ "rightMin": null
+ },
+ "id": 4,
+ "interval": "30s",
+ "legend": {
+ "alignAsTable": false,
+ "avg": true,
+ "current": false,
+ "max": false,
+ "min": false,
+ "rightSide": false,
+ "show": true,
+ "total": false,
+ "values": true
+ },
+ "lines": true,
+ "linewidth": 2,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "span": 6,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "alias": "txOctets",
+ "dsType": "influxdb",
+ "groupBy": [
+ {
+ "params": [
+ "system"
+ ],
+ "type": "tag"
+ },
+ {
+ "params": [
+ "vnic"
+ ],
+ "type": "tag"
+ }
+ ],
+ "measurement": "vNicPerformance",
+ "orderByTime": "ASC",
+ "policy": "default",
+ "refId": "B",
+ "resultFormat": "time_series",
+ "select": [
+ [
+ {
+ "params": [
+ "transmittedOctetsAccumulated"
+ ],
+ "type": "field"
+ }
+ ]
+ ],
+ "tags": []
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Transmitted Octets",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "x-axis": true,
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "y-axis": true,
+ "y_formats": [
+ "short",
+ "short"
+ ],
+ "yaxes": [
+ {
+ "format": "short",
+ "label": "Octets/Packets",
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ]
+ }
+ ],
+ "repeat": null,
+ "repeatIteration": null,
+ "repeatRowId": null,
+ "showTitle": false,
+ "title": "Dashboard Row",
+ "titleSize": "h6"
+ }
+ ],
+ "schemaVersion": 14,
+ "style": "dark",
+ "tags": [],
+ "templating": {
+ "list": []
+ },
+ "time": {
+ "from": "now-30m",
+ "to": "now"
+ },
+ "timepicker": {
+ "now": true,
+ "refresh_intervals": [
+ "10s",
+ "20s",
+ "30s",
+ "1m"
+ ],
+ "time_options": [
+ "5m",
+ "15m",
+ "1h",
+ "6h",
+ "12h",
+ "24h",
+ "2d",
+ "7d",
+ "30d"
+ ]
+ },
+ "timezone": "browser",
+ "title": "VES Demo",
+ "version": 3
+}
+}
diff --git a/tools/monitor.py b/tools/monitor.py
new file mode 100644
index 0000000..91c0eae
--- /dev/null
+++ b/tools/monitor.py
@@ -0,0 +1,765 @@
+#!/usr/bin/env python
+#
+# Copyright 2016-2017 AT&T Intellectual Property, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# What this is: Monitor and closed-loop policy agent as part of the OPNFV VES
+# ves_onap_demo.
+#
+# Status: this is a work in progress, under test.
+
+from rest_dispatcher import PathDispatcher, set_404_content
+from wsgiref.simple_server import make_server
+import sys
+import os
+import platform
+import traceback
+import time
+from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
+import ConfigParser
+import logging.handlers
+from base64 import b64decode
+import string
+import json
+import jsonschema
+from functools import partial
+import requests
+
+monitor_mode = "f"
+vdu_id = ['','','','','','']
+summary_e = ['***** Summary of key stats *****','','','']
+summary_c = ['Collectd agents:']
+status = ['','Started','Started','Started']
+base_url = ''
+template_404 = b'''POST {0}'''
+columns = 0
+rows = 0
+
+class JSONObject:
+ def __init__(self, d):
+ self.__dict__ = d
+
+__all__ = []
+__version__ = 0.1
+__date__ = '2015-12-04'
+__updated__ = '2015-12-04'
+
+TESTRUN = False
+DEBUG = False
+PROFILE = False
+
+#------------------------------------------------------------------------------
+# Address of influxdb server.
+#------------------------------------------------------------------------------
+
+influxdb = '127.0.0.1'
+
+#------------------------------------------------------------------------------
+# Credentials we expect clients to authenticate themselves with.
+#------------------------------------------------------------------------------
+vel_username = ''
+vel_password = ''
+
+#------------------------------------------------------------------------------
+# The JSON schema which we will use to validate events.
+#------------------------------------------------------------------------------
+vel_schema = None
+
+#------------------------------------------------------------------------------
+# The JSON schema which we will use to validate client throttle state.
+#------------------------------------------------------------------------------
+throttle_schema = None
+
+#------------------------------------------------------------------------------
+# The JSON schema which we will use to provoke throttling commands for testing.
+#------------------------------------------------------------------------------
+test_control_schema = None
+
+#------------------------------------------------------------------------------
+# Pending command list from the testControl API
+# This is sent as a response commandList to the next received event.
+#------------------------------------------------------------------------------
+pending_command_list = None
+
+#------------------------------------------------------------------------------
+# Logger for this module.
+#------------------------------------------------------------------------------
+logger = None
+
+def listener(environ, start_response, schema):
+ '''
+ Handler for the Vendor Event Listener REST API.
+
+ Extract headers and the body and check that:
+
+ 1) The client authenticated themselves correctly.
+ 2) The body validates against the provided schema for the API.
+
+ '''
+ logger.info('Got a Vendor Event request')
+ print('==== ' + time.asctime() + ' ' + '=' * 49)
+
+ #--------------------------------------------------------------------------
+ # Extract the content from the request.
+ #--------------------------------------------------------------------------
+ length = int(environ.get('CONTENT_LENGTH', '0'))
+ logger.debug('Content Length: {0}'.format(length))
+ body = environ['wsgi.input'].read(length)
+ logger.debug('Content Body: {0}'.format(body))
+
+ mode, b64_credentials = string.split(environ.get('HTTP_AUTHORIZATION',
+ 'None None'))
+ # logger.debug('Auth. Mode: {0} Credentials: {1}'.format(mode,
+ # b64_credentials))
+ logger.debug('Auth. Mode: {0} Credentials: ****'.format(mode))
+ if (b64_credentials != 'None'):
+ credentials = b64decode(b64_credentials)
+ else:
+ credentials = None
+
+ # logger.debug('Credentials: {0}'.format(credentials))
+ logger.debug('Credentials: ****')
+
+ #--------------------------------------------------------------------------
+ # If we have a schema file then check that the event matches that expected.
+ #--------------------------------------------------------------------------
+ if (schema is not None):
+ logger.debug('Attempting to validate data: {0}\n'
+ 'Against schema: {1}'.format(body, schema))
+ try:
+ decoded_body = json.loads(body)
+ jsonschema.validate(decoded_body, schema)
+ logger.info('Event is valid!')
+ print('Valid body decoded & checked against schema OK:\n'
+ '{0}'.format(json.dumps(decoded_body,
+ sort_keys=True,
+ indent=4,
+ separators=(',', ': '))))
+
+ except jsonschema.SchemaError as e:
+ logger.error('Schema is not valid! {0}'.format(e))
+ print('Schema is not valid! {0}'.format(e))
+
+ except jsonschema.ValidationError as e:
+ logger.warn('Event is not valid against schema! {0}'.format(e))
+ print('Event is not valid against schema! {0}'.format(e))
+ print('Bad JSON body decoded:\n'
+ '{0}'.format(json.dumps(decoded_body,
+ sort_keys=True,
+ indent=4,
+ separators=(',', ': '))))
+
+ except Exception as e:
+ logger.error('Event invalid for unexpected reason! {0}'.format(e))
+ print('Schema is not valid for unexpected reason! {0}'.format(e))
+ else:
+ logger.debug('No schema so just decode JSON: {0}'.format(body))
+ try:
+ decoded_body = json.loads(body)
+ print('Valid JSON body (no schema checking) decoded:\n'
+ '{0}'.format(json.dumps(decoded_body,
+ sort_keys=True,
+ indent=4,
+ separators=(',', ': '))))
+ logger.info('Event is valid JSON but not checked against schema!')
+
+ except Exception as e:
+ logger.error('Event invalid for unexpected reason! {0}'.format(e))
+ print('JSON body not valid for unexpected reason! {0}'.format(e))
+
+ #--------------------------------------------------------------------------
+ # See whether the user authenticated themselves correctly.
+ #--------------------------------------------------------------------------
+ if (credentials == (vel_username + ':' + vel_password)):
+ logger.debug('Authenticated OK')
+# print('Authenticated OK')
+
+ #----------------------------------------------------------------------
+ # Respond to the caller. If we have a pending commandList from the
+ # testControl API, send it in response.
+ #----------------------------------------------------------------------
+ global pending_command_list
+ if pending_command_list is not None:
+ start_response('202 Accepted',
+ [('Content-type', 'application/json')])
+ response = pending_command_list
+ pending_command_list = None
+
+ print('\n'+ '='*80)
+ print('Sending pending commandList in the response:\n'
+ '{0}'.format(json.dumps(response,
+ sort_keys=True,
+ indent=4,
+ separators=(',', ': '))))
+ print('='*80 + '\n')
+ yield json.dumps(response)
+ else:
+ start_response('202 Accepted', [])
+ yield ''
+ else:
+ logger.warn('Failed to authenticate OK; creds: ' + credentials)
+ print('Failed to authenticate agent credentials: ', credentials,
+ 'against expected ', vel_username, ':', vel_password)
+
+ #----------------------------------------------------------------------
+ # Respond to the caller.
+ #----------------------------------------------------------------------
+ start_response('401 Unauthorized', [ ('Content-type',
+ 'application/json')])
+ req_error = { 'requestError': {
+ 'policyException': {
+ 'messageId': 'POL0001',
+ 'text': 'Failed to authenticate'
+ }
+ }
+ }
+ yield json.dumps(req_error)
+
+ save_event(body)
+
+#--------------------------------------------------------------------------
+# Send event to influxdb
+#--------------------------------------------------------------------------
+def send_to_influxdb(event,pdata):
+ url = 'http://{}:8086/write?db=veseventsdb'.format(influxdb)
+ print('Send {} to influxdb at {}: {}'.format(event,influxdb,pdata))
+ r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'})
+ print('influxdb return code {}'.format(r.status_code))
+ if r.status_code != 204:
+ print('*** Influxdb save failed, return code {} ***'.format(r.status_code))
+
+#--------------------------------------------------------------------------
+# Save event data
+#--------------------------------------------------------------------------
+def save_event(body):
+ jobj = json.loads(body)
+ e = json.loads(body, object_hook=JSONObject)
+
+ domain = jobj['event']['commonEventHeader']['domain']
+ timestamp = jobj['event']['commonEventHeader']['lastEpochMicrosec']
+ agent = jobj['event']['commonEventHeader']['reportingEntityName'].upper( )
+ if "LOCALHOST" in agent:
+ agent = "computehost"
+ source = jobj['event']['commonEventHeader']['sourceId'].upper( )
+
+ if e.event.commonEventHeader.domain == "heartbeat":
+ print('Found Heartbeat')
+ send_to_influxdb(event,'heartbeat,system={},sequence={}'.format(agent,e.event.commonEventHeader.sequence))
+
+ if 'measurementsForVfScalingFields' in jobj['event']:
+ print('Found measurementsForVfScalingFields')
+
+# "measurementsForVfScalingFields": {
+# "additionalMeasurements": [
+# {
+# "arrayOfFields": [
+# {
+# "name": "load-longterm",
+# "value": "0.34"
+# },
+# {
+# "name": "load-shortterm",
+# "value": "0.32"
+# },
+# {
+# "name": "load-midterm",
+# "value": "0.34"
+# }
+# ],
+# "name": "load"
+# }
+# ],
+
+ if 'additionalMeasurements' in jobj['event']['measurementsForVfScalingFields']:
+ for meas in jobj['event']['measurementsForVfScalingFields']['additionalMeasurements']:
+ name = meas['name']
+ pdata = '{},system={}'.format(name,source)
+ for field in meas['arrayOfFields']:
+ pdata = pdata + ",{}={}".format(field['name'],field['value'])
+ i=pdata.find(',', pdata.find('system'))
+ pdata = pdata[:i] + ' ' + pdata[i+1:]
+ send_to_influxdb("systemLoad", pdata)
+
+# "memoryUsageArray": [
+# {
+# "memoryBuffered": 269056.0,
+# "memoryCached": 17636956.0,
+# "memoryFree": 244731658240,
+# "memorySlabRecl": 753160.0,
+# "memorySlabUnrecl": 210800.0,
+# "memoryUsed": 6240064.0,
+# "vmIdentifier": "opnfv01"
+# }
+# ],
+
+ if 'memoryUsageArray' in jobj['event']['measurementsForVfScalingFields']:
+ print('Found memoryUsageArray')
+ pdata = 'memoryUsage,system={}'.format(source)
+ vmid=e.event.measurementsForVfScalingFields.memoryUsageArray[0].vmIdentifier
+ d = jobj['event']['measurementsForVfScalingFields']['memoryUsageArray'][0].items()
+ for key,val in d:
+ if key != 'vmIdentifier':
+ pdata = pdata + ',{}={}'.format(key,val)
+ i=pdata.find(',', pdata.find('system'))
+ pdata = pdata[:i] + ' ' + pdata[i+1:]
+ send_to_influxdb("memoryUsage", pdata)
+
+# "vNicPerformanceArray": [
+# {
+# "receivedDiscardedPacketsAccumulated": 0,
+# "receivedErrorPacketsAccumulated": 0,
+# "receivedOctetsAccumulated": 476.801524578,
+# "receivedTotalPacketsAccumulated": 2.90000899705,
+# "transmittedDiscardedPacketsAccumulated": 0,
+# "transmittedErrorPacketsAccumulated": 0,
+# "transmittedOctetsAccumulated": 230.100735749,
+# "transmittedTotalPacketsAccumulated": 1.20000372292,
+# "vNicIdentifier": "eno4",
+# "valuesAreSuspect": "true"
+# },
+
+ if 'vNicPerformanceArray' in jobj['event']['measurementsForVfScalingFields']:
+ print('Found vNicPerformanceArray')
+ for vnic in jobj['event']['measurementsForVfScalingFields']['vNicPerformanceArray']:
+ vnid=vnic['vNicIdentifier']
+ pdata = 'vNicPerformance,system={},vnic={}'.format(vmid,vnid)
+ d = vnic.items()
+ for key,val in d:
+ if key != 'vNicIdentifier':
+ pdata = pdata + ',{}={}'.format(key,val)
+ i=pdata.find(',', pdata.find('vnic'))
+ pdata = pdata[:i] + ' ' + pdata[i+1:]
+ send_to_influxdb("vNicPerformance", pdata)
+
+def test_listener(environ, start_response, schema):
+ '''
+ Handler for the Test Collector Test Control API.
+
+ There is no authentication on this interface.
+
+ This simply stores a commandList which will be sent in response to the next
+ incoming event on the EVEL interface.
+ '''
+ global pending_command_list
+ logger.info('Got a Test Control input')
+ print('============================')
+ print('==== TEST CONTROL INPUT ====')
+
+ #--------------------------------------------------------------------------
+ # GET allows us to get the current pending request.
+ #--------------------------------------------------------------------------
+ if environ.get('REQUEST_METHOD') == 'GET':
+ start_response('200 OK', [('Content-type', 'application/json')])
+ yield json.dumps(pending_command_list)
+ return
+
+ #--------------------------------------------------------------------------
+ # Extract the content from the request.
+ #--------------------------------------------------------------------------
+ length = int(environ.get('CONTENT_LENGTH', '0'))
+ logger.debug('TestControl Content Length: {0}'.format(length))
+ body = environ['wsgi.input'].read(length)
+ logger.debug('TestControl Content Body: {0}'.format(body))
+
+ #--------------------------------------------------------------------------
+ # If we have a schema file then check that the event matches that expected.
+ #--------------------------------------------------------------------------
+ if (schema is not None):
+ logger.debug('Attempting to validate data: {0}\n'
+ 'Against schema: {1}'.format(body, schema))
+ try:
+ decoded_body = json.loads(body)
+ jsonschema.validate(decoded_body, schema)
+ logger.info('TestControl is valid!')
+ print('TestControl:\n'
+ '{0}'.format(json.dumps(decoded_body,
+ sort_keys=True,
+ indent=4,
+ separators=(',', ': '))))
+
+ except jsonschema.SchemaError as e:
+ logger.error('TestControl Schema is not valid: {0}'.format(e))
+ print('TestControl Schema is not valid: {0}'.format(e))
+
+ except jsonschema.ValidationError as e:
+ logger.warn('TestControl input not valid: {0}'.format(e))
+ print('TestControl input not valid: {0}'.format(e))
+ print('Bad JSON body decoded:\n'
+ '{0}'.format(json.dumps(decoded_body,
+ sort_keys=True,
+ indent=4,
+ separators=(',', ': '))))
+
+ except Exception as e:
+ logger.error('TestControl input not valid: {0}'.format(e))
+ print('TestControl Schema not valid: {0}'.format(e))
+ else:
+ logger.debug('Missing schema just decode JSON: {0}'.format(body))
+ try:
+ decoded_body = json.loads(body)
+ print('Valid JSON body (no schema checking) decoded:\n'
+ '{0}'.format(json.dumps(decoded_body,
+ sort_keys=True,
+ indent=4,
+ separators=(',', ': '))))
+ logger.info('TestControl input not checked against schema!')
+
+ except Exception as e:
+ logger.error('TestControl input not valid: {0}'.format(e))
+ print('TestControl input not valid: {0}'.format(e))
+
+ #--------------------------------------------------------------------------
+ # Respond to the caller. If we received otherField 'ThrottleRequest',
+ # generate the appropriate canned response.
+ #--------------------------------------------------------------------------
+ pending_command_list = decoded_body
+ print('===== TEST CONTROL END =====')
+ print('============================')
+ start_response('202 Accepted', [])
+ yield ''
+
+def main(argv=None):
+ '''
+ Main function for the collector start-up.
+
+ Called with command-line arguments:
+ * --config *<file>*
+ * --section *<section>*
+ * --verbose
+
+ Where:
+
+ *<file>* specifies the path to the configuration file.
+
+ *<section>* specifies the section within that config file.
+
+ *verbose* generates more information in the log files.
+
+ The process listens for REST API invocations and checks them. Errors are
+ displayed to stdout and logged.
+ '''
+
+ if argv is None:
+ argv = sys.argv
+ else:
+ sys.argv.extend(argv)
+
+ program_name = os.path.basename(sys.argv[0])
+ program_version = 'v{0}'.format(__version__)
+ program_build_date = str(__updated__)
+ program_version_message = '%%(prog)s {0} ({1})'.format(program_version,
+ program_build_date)
+ if (__import__('__main__').__doc__ is not None):
+ program_shortdesc = __import__('__main__').__doc__.split('\n')[1]
+ else:
+ program_shortdesc = 'Running in test harness'
+ program_license = '''{0}
+
+ Created on {1}.
+ Copyright 2015 Metaswitch Networks Ltd. All rights reserved.
+
+ Distributed on an "AS IS" basis without warranties
+ or conditions of any kind, either express or implied.
+
+USAGE
+'''.format(program_shortdesc, str(__date__))
+
+ try:
+ #----------------------------------------------------------------------
+ # Setup argument parser so we can parse the command-line.
+ #----------------------------------------------------------------------
+ parser = ArgumentParser(description=program_license,
+ formatter_class=ArgumentDefaultsHelpFormatter)
+ parser.add_argument('-i', '--influxdb',
+ dest='influxdb',
+ default='localhost',
+ help='InfluxDB server addresss')
+ parser.add_argument('-v', '--verbose',
+ dest='verbose',
+ action='count',
+ help='set verbosity level')
+ parser.add_argument('-V', '--version',
+ action='version',
+ version=program_version_message,
+ help='Display version information')
+ parser.add_argument('-a', '--api-version',
+ dest='api_version',
+ default='5',
+ help='set API version')
+ parser.add_argument('-c', '--config',
+ dest='config',
+ default='/etc/opt/att/collector.conf',
+ help='Use this config file.',
+ metavar='<file>')
+ parser.add_argument('-s', '--section',
+ dest='section',
+ default='default',
+ metavar='<section>',
+ help='section to use in the config file')
+
+ #----------------------------------------------------------------------
+ # Process arguments received.
+ #----------------------------------------------------------------------
+ args = parser.parse_args()
+ verbose = args.verbose
+ api_version = args.api_version
+ config_file = args.config
+ config_section = args.section
+
+ #----------------------------------------------------------------------
+ # Now read the config file, using command-line supplied values as
+ # overrides.
+ #----------------------------------------------------------------------
+ defaults = {'log_file': 'collector.log',
+ 'vel_port': '12233',
+ 'vel_path': '',
+ 'vel_topic_name': ''
+ }
+ overrides = {}
+ config = ConfigParser.SafeConfigParser(defaults)
+ config.read(config_file)
+
+ #----------------------------------------------------------------------
+ # extract the values we want.
+ #----------------------------------------------------------------------
+ global influxdb
+ global vel_username
+ global vel_password
+ global vel_topic_name
+ influxdb = config.get(config_section, 'influxdb', vars=overrides)
+ log_file = config.get(config_section, 'log_file', vars=overrides)
+ vel_port = config.get(config_section, 'vel_port', vars=overrides)
+ vel_path = config.get(config_section, 'vel_path', vars=overrides)
+
+ vel_topic_name = config.get(config_section,
+ 'vel_topic_name',
+ vars=overrides)
+ vel_username = config.get(config_section,
+ 'vel_username',
+ vars=overrides)
+ vel_password = config.get(config_section,
+ 'vel_password',
+ vars=overrides)
+ vel_schema_file = config.get(config_section,
+ 'schema_file',
+ vars=overrides)
+ base_schema_file = config.get(config_section,
+ 'base_schema_file',
+ vars=overrides)
+ throttle_schema_file = config.get(config_section,
+ 'throttle_schema_file',
+ vars=overrides)
+ test_control_schema_file = config.get(config_section,
+ 'test_control_schema_file',
+ vars=overrides)
+
+ #----------------------------------------------------------------------
+ # Finally we have enough info to start a proper flow trace.
+ #----------------------------------------------------------------------
+ global logger
+ print('Logfile: {0}'.format(log_file))
+ logger = logging.getLogger('monitor')
+ if verbose > 0:
+ print('Verbose mode on')
+ logger.setLevel(logging.DEBUG)
+ else:
+ logger.setLevel(logging.INFO)
+ handler = logging.handlers.RotatingFileHandler(log_file,
+ maxBytes=1000000,
+ backupCount=10)
+ if (platform.system() == 'Windows'):
+ date_format = '%Y-%m-%d %H:%M:%S'
+ else:
+ date_format = '%Y-%m-%d %H:%M:%S.%f %z'
+ formatter = logging.Formatter('%(asctime)s %(name)s - '
+ '%(levelname)s - %(message)s',
+ date_format)
+ handler.setFormatter(formatter)
+ logger.addHandler(handler)
+ logger.info('Started')
+
+ #----------------------------------------------------------------------
+ # Log the details of the configuration.
+ #----------------------------------------------------------------------
+ logger.debug('Log file = {0}'.format(log_file))
+ logger.debug('Influxdb server = {0}'.format(influxdb))
+ logger.debug('Event Listener Port = {0}'.format(vel_port))
+ logger.debug('Event Listener Path = {0}'.format(vel_path))
+ logger.debug('Event Listener Topic = {0}'.format(vel_topic_name))
+ logger.debug('Event Listener Username = {0}'.format(vel_username))
+ # logger.debug('Event Listener Password = {0}'.format(vel_password))
+ logger.debug('Event Listener JSON Schema File = {0}'.format(
+ vel_schema_file))
+ logger.debug('Base JSON Schema File = {0}'.format(base_schema_file))
+ logger.debug('Throttle JSON Schema File = {0}'.format(
+ throttle_schema_file))
+ logger.debug('Test Control JSON Schema File = {0}'.format(
+ test_control_schema_file))
+
+ #----------------------------------------------------------------------
+ # Perform some basic error checking on the config.
+ #----------------------------------------------------------------------
+ if (int(vel_port) < 1024 or int(vel_port) > 65535):
+ logger.error('Invalid Vendor Event Listener port ({0}) '
+ 'specified'.format(vel_port))
+ raise RuntimeError('Invalid Vendor Event Listener port ({0}) '
+ 'specified'.format(vel_port))
+
+ if (len(vel_path) > 0 and vel_path[-1] != '/'):
+ logger.warning('Event Listener Path ({0}) should have terminating '
+ '"/"! Adding one on to configured string.'.format(
+ vel_path))
+ vel_path += '/'
+
+ #----------------------------------------------------------------------
+ # Load up the vel_schema, if it exists.
+ #----------------------------------------------------------------------
+ if not os.path.exists(vel_schema_file):
+ logger.warning('Event Listener Schema File ({0}) not found. '
+ 'No validation will be undertaken.'.format(
+ vel_schema_file))
+ else:
+ global vel_schema
+ global throttle_schema
+ global test_control_schema
+ vel_schema = json.load(open(vel_schema_file, 'r'))
+ logger.debug('Loaded the JSON schema file')
+
+ #------------------------------------------------------------------
+ # Load up the throttle_schema, if it exists.
+ #------------------------------------------------------------------
+ if (os.path.exists(throttle_schema_file)):
+ logger.debug('Loading throttle schema')
+ throttle_fragment = json.load(open(throttle_schema_file, 'r'))
+ throttle_schema = {}
+ throttle_schema.update(vel_schema)
+ throttle_schema.update(throttle_fragment)
+ logger.debug('Loaded the throttle schema')
+
+ #------------------------------------------------------------------
+ # Load up the test control _schema, if it exists.
+ #------------------------------------------------------------------
+ if (os.path.exists(test_control_schema_file)):
+ logger.debug('Loading test control schema')
+ test_control_fragment = json.load(
+ open(test_control_schema_file, 'r'))
+ test_control_schema = {}
+ test_control_schema.update(vel_schema)
+ test_control_schema.update(test_control_fragment)
+ logger.debug('Loaded the test control schema')
+
+ #------------------------------------------------------------------
+ # Load up the base_schema, if it exists.
+ #------------------------------------------------------------------
+ if (os.path.exists(base_schema_file)):
+ logger.debug('Updating the schema with base definition')
+ base_schema = json.load(open(base_schema_file, 'r'))
+ vel_schema.update(base_schema)
+ logger.debug('Updated the JSON schema file')
+
+ #----------------------------------------------------------------------
+ # We are now ready to get started with processing. Start-up the various
+ # components of the system in order:
+ #
+ # 1) Create the dispatcher.
+ # 2) Register the functions for the URLs of interest.
+ # 3) Run the webserver.
+ #----------------------------------------------------------------------
+ root_url = '/{0}eventListener/v{1}{2}'.\
+ format(vel_path,
+ api_version,
+ '/' + vel_topic_name
+ if len(vel_topic_name) > 0
+ else '')
+ throttle_url = '/{0}eventListener/v{1}/clientThrottlingState'.\
+ format(vel_path, api_version)
+ set_404_content(root_url)
+ dispatcher = PathDispatcher()
+ vendor_event_listener = partial(listener, schema = vel_schema)
+ dispatcher.register('GET', root_url, vendor_event_listener)
+ dispatcher.register('POST', root_url, vendor_event_listener)
+ vendor_throttle_listener = partial(listener, schema = throttle_schema)
+ dispatcher.register('GET', throttle_url, vendor_throttle_listener)
+ dispatcher.register('POST', throttle_url, vendor_throttle_listener)
+
+ #----------------------------------------------------------------------
+ # We also add a POST-only mechanism for test control, so that we can
+ # send commands to a single attached client.
+ #----------------------------------------------------------------------
+ test_control_url = '/testControl/v{0}/commandList'.format(api_version)
+ test_control_listener = partial(test_listener,
+ schema = test_control_schema)
+ dispatcher.register('POST', test_control_url, test_control_listener)
+ dispatcher.register('GET', test_control_url, test_control_listener)
+
+ httpd = make_server('', int(vel_port), dispatcher)
+ print('Serving on port {0}...'.format(vel_port))
+ httpd.serve_forever()
+
+ logger.error('Main loop exited unexpectedly!')
+ return 0
+
+ except KeyboardInterrupt:
+ #----------------------------------------------------------------------
+ # handle keyboard interrupt
+ #----------------------------------------------------------------------
+ logger.info('Exiting on keyboard interrupt!')
+ return 0
+
+ except Exception as e:
+ #----------------------------------------------------------------------
+ # Handle unexpected exceptions.
+ #----------------------------------------------------------------------
+ if DEBUG or TESTRUN:
+ raise(e)
+ indent = len(program_name) * ' '
+ sys.stderr.write(program_name + ': ' + repr(e) + '\n')
+ sys.stderr.write(indent + ' for help use --help\n')
+ sys.stderr.write(traceback.format_exc())
+ logger.critical('Exiting because of exception: {0}'.format(e))
+ logger.critical(traceback.format_exc())
+ return 2
+
+#------------------------------------------------------------------------------
+# MAIN SCRIPT ENTRY POINT.
+#------------------------------------------------------------------------------
+if __name__ == '__main__':
+ if TESTRUN:
+ #----------------------------------------------------------------------
+ # Running tests - note that doctest comments haven't been included so
+ # this is a hook for future improvements.
+ #----------------------------------------------------------------------
+ import doctest
+ doctest.testmod()
+
+ if PROFILE:
+ #----------------------------------------------------------------------
+ # Profiling performance. Performance isn't expected to be a major
+ # issue, but this should all work as expected.
+ #----------------------------------------------------------------------
+ import cProfile
+ import pstats
+ profile_filename = 'collector_profile.txt'
+ cProfile.run('main()', profile_filename)
+ statsfile = open('collector_profile_stats.txt', 'wb')
+ p = pstats.Stats(profile_filename, stream=statsfile)
+ stats = p.strip_dirs().sort_stats('cumulative')
+ stats.print_stats()
+ statsfile.close()
+ sys.exit(0)
+
+ #--------------------------------------------------------------------------
+ # Normal operation - call through to the main function.
+ #--------------------------------------------------------------------------
+ sys.exit(main())
diff --git a/tools/ves-setup.sh b/tools/ves-setup.sh
index 65dd613..b37662b 100644
--- a/tools/ves-setup.sh
+++ b/tools/ves-setup.sh
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-#. What this is: Setup script for VES agent framework.
+#. What this is: Setup script for the VES monitoring framework.
#. With this script VES support can be installed in one or more hosts, with:
#. - a dedicated or shared Kafka server for collection of events from collectd
#. - VES collectd agents running in host or guest mode
@@ -23,6 +23,7 @@
#. pre-installed VES collector e.g. from the ONAP project.
#. - Install Kafka server on one of the hosts, or use a pre-installed server
#. accessible from the agent hosts.
+#. - Install collectd on each host.
#. - Install the VES agent on each host.
#. - As needed, install the VES agent on each virtual host. This could include
#. pre-installed VES agents in VM or container images, which are configured
@@ -47,21 +48,13 @@
#. ves_kafka_host: kafka host IP or hostname (default: 127.0.0.1)
#.
#. Usage:
-#. wget https://raw.githubusercontent.com/opnfv/ves/master/tools/ves-setup.sh
-#. bash ves-setup.sh <collector|kafka|agent>
+#. git clone https://gerrit.opnfv.org/gerrit/ves /tmp/ves
+#. bash /tmp/ves/ves-setup.sh <collector|kafka|collectd|agent>
#. collector: setup VES collector (test collector)
#. kafka: setup kafka server for VES events from collect agent(s)
-#. agent: setup VES agent in host or guest mode
-#. Recommended sequence is:
-#. ssh into your collector host and run these commands:
-#. $ ves_host=$(ip route get 8.8.8.8 | awk '{print $NF; exit}')
-#. $ export ves_host
-#. $ bash ves-setup.sh collector
-#. ...then for each agent host:
-#. copy ~/ves_env.sh and ves-setup.sh to the host e.g. via scp
-#. ssh into the host and run, directly or via ssh -x
-#. $ bash ves-setup.sh kafka
-#. $ bash ves-setup.sh agent
+#. collectd: setup collectd with libvirt plugin, as a kafka publisher
+#. agent: setup VES agent in host or guest mode, as a kafka consumer
+#. See demo_deploy.sh in this repo for a recommended sequence of the above.
#.
#. Status: this is a work in progress, under test.
@@ -81,18 +74,27 @@ function log() {
function common_prereqs() {
log "install common prerequisites"
- sudo apt-get install -y default-jre
- sudo apt-get install -y zookeeperd
- sudo apt-get install -y python-pip
- sudo pip install kafka-python
+ if [[ ! -f /.dockerenv ]]; then dosudo="sudo"; fi
+ $dosudo apt-get update
+ $dosudo apt-get install -y git
+ # Required for kafka
+ $dosudo apt-get install -y default-jre
+ $dosudo apt-get install -y zookeeperd
+ $dosudo apt-get install -y python-pip
+ $dosudo pip install kafka-python
+ # Required for building collectd
+ $dosudo apt-get install -y pkg-config
}
function setup_env() {
- if [[ ! -f ~/ves_env.sh ]]; then
- cat <<EOF >~/ves_env.sh
+ if [[ ! -d /tmp/ves ]]; then mkdir /tmp/ves; fi
+ cp $0 /tmp/ves
+ if [[ ! -f /tmp/ves/ves_env.sh ]]; then
+ cat <<EOF >/tmp/ves/ves_env.sh
#!/bin/bash
ves_mode="${ves_mode:=host}"
ves_host="${ves_host:=127.0.0.1}"
+ves_hostname="${ves_hostname:=localhost}"
ves_port="${ves_port:=30000}"
ves_path="${ves_path:=}"
ves_topic="${ves_topic:=}"
@@ -105,6 +107,7 @@ ves_kafka_host="${ves_kafka_host:=127.0.0.1}"
ves_kafka_port="${ves_kafka_port:=9092}"
export ves_mode
export ves_host
+export ves_hostname
export ves_port
export ves_path
export ves_topic
@@ -112,57 +115,62 @@ export ves_https
export ves_user
export ves_pass
export ves_interval
-export ves_kafka_host
-export ves_port
+export ves_kafka_hostame
export ves_kafka_port
EOF
fi
- source ~/ves_env.sh
+ source /tmp/ves/ves_env.sh
}
function setup_kafka() {
log "setup kafka server"
common_prereqs
+ setup_env
- log "get and unpack kafka_2.11-0.11.0.0.tgz"
- wget "http://www-eu.apache.org/dist/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz"
- tar -xvzf kafka_2.11-0.11.0.0.tgz
+ cd
+ ver="0.11.0.2"
+ log "get and unpack kafka_2.11-$ver.tgz"
+ wget "http://www-eu.apache.org/dist/kafka/$ver/kafka_2.11-$ver.tgz"
+ tar -xvzf kafka_2.11-$ver.tgz
log "set delete.topic.enable=true"
sed -i -- 's/#delete.topic.enable=true/delete.topic.enable=true/' \
- kafka_2.11-0.11.0.0/config/server.properties
- grep delete.topic.enable kafka_2.11-0.11.0.0/config/server.properties
+ kafka_2.11-$ver/config/server.properties
+ grep delete.topic.enable kafka_2.11-$ver/config/server.properties
# TODO: Barometer VES guide to clarify hostname must be in /etc/hosts
- sudo nohup kafka_2.11-0.11.0.0/bin/kafka-server-start.sh \
- kafka_2.11-0.11.0.0/config/server.properties \
- > kafka_2.11-0.11.0.0/kafka.log 2>&1 &
- # TODO: find a test that does not hang the script at
- # echo "Hello, World" | ~/kafka_2.11-0.11.0.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TopicTest > /dev/null
- # ~/kafka_2.11-0.11.0.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic TopicTest --from-beginning
+ sudo nohup kafka_2.11-$ver/bin/kafka-server-start.sh \
+ kafka_2.11-$ver/config/server.properties \
+ > kafka_2.11-$ver/kafka.log 2>&1 &
}
-function setup_agent() {
- log "setup agent"
- common_prereqs
-
- log "cleanup any previous failed install"
- sudo rm -rf ~/collectd-virt
- sudo rm -rf ~/librdkafka
- sudo rm -rf ~/collectd
-
+function setup_kafka_client() {
log "Install Apache Kafka C/C++ client library"
- sudo apt-get install -y build-essential
+ if [[ ! -f /.dockerenv ]]; then dosudo="sudo"; fi
+ $dosudo apt-get install -y build-essential
git clone https://github.com/edenhill/librdkafka.git ~/librdkafka
cd ~/librdkafka
git checkout -b v0.9.5 v0.9.5
# TODO: Barometer VES guide to clarify specific prerequisites for Ubuntu
- sudo apt-get install -y libpthread-stubs0-dev
- sudo apt-get install -y libssl-dev
- sudo apt-get install -y libsasl2-dev
- sudo apt-get install -y liblz4-dev
+ $dosudo apt-get install -y libpthread-stubs0-dev
+ $dosudo apt-get install -y libssl-dev
+ $dosudo apt-get install -y libsasl2-dev
+ $dosudo apt-get install -y liblz4-dev
./configure --prefix=/usr
make
- sudo make install
+ $dosudo make install
+}
+
+function setup_collectd() {
+ log "setup collectd"
+ common_prereqs
+ setup_env
+
+ log "cleanup any previous failed install"
+ sudo rm -rf ~/collectd-virt
+ sudo rm -rf ~/librdkafka
+ sudo rm -rf ~/collectd
+
+ setup_kafka_client
log "Build collectd with Kafka support"
git clone https://github.com/collectd/collectd.git ~/collectd
@@ -194,29 +202,6 @@ function setup_agent() {
sudo systemctl daemon-reload
sudo systemctl start collectd.service
- log "install VES agent prerequisites"
- sudo pip install pyyaml
-
- log "clone OPNFV Barometer"
- git clone https://gerrit.opnfv.org/gerrit/barometer ~/barometer
-
- log "setup ves_app_config.conf"
- cd ~/barometer/3rd_party/collectd-ves-app/ves_app
- cat <<EOF >ves_app_config.conf
-[config]
-Domain = $ves_host
-Port = $ves_port
-Path = $ves_path
-Topic = $ves_topic
-UseHttps = $ves_https
-Username = $ves_user
-Password = $ves_pass
-SendEventInterval = $ves_interval
-ApiVersion = $ves_version
-KafkaPort = $ves_kafka_port
-KafkaBroker = $ves_kafka_host
-EOF
-
log "setup VES collectd config for VES $ves_mode mode"
if [[ "$ves_mode" == "host" ]]; then
# TODO: Barometer VES guide to clarify prerequisites install for Ubuntu
@@ -228,19 +213,16 @@ EOF
# http://docs.opnfv.org/en/latest/submodules/barometer/docs/release/userguide/feature.userguide.html#virt-plugin
sudo systemctl start libvirtd
- git clone https://github.com/maryamtahhan/collectd ~/collectd-virt
- cd ~/collectd-virt
+ rm -rf /tmp/ves/collectd-virt
+ git clone https://github.com/maryamtahhan/collectd /tmp/ves/collectd-virt
+ cd /tmp/ves/collectd-virt
./build.sh
./configure --enable-syslog --enable-logfile --enable-debug
make
sudo make install
-
- # TODO: Barometer VES guide refers to "At least one VM instance should be
- # up and running by hypervisor on the host." The process needs to accomodate
- # pre-installation of the VES agent *prior* to the first VM being created.
-
cat <<EOF | sudo tee -a /opt/collectd/etc/collectd.conf
+# for VES plugin
LoadPlugin logfile
<Plugin logfile>
LogLevel info
@@ -270,6 +252,7 @@ LoadPlugin write_kafka
EOF
else
cat <<EOF | sudo tee -a /opt/collectd/etc/collectd.conf
+# for VES plugin
LoadPlugin logfile
<Plugin logfile>
LogLevel info
@@ -280,15 +263,6 @@ LoadPlugin logfile
LoadPlugin cpu
-#LoadPlugin virt
-#<Plugin virt>
-# Connection "qemu:///system"
-# RefreshInterval 60
-# HostnameFormat uuid
-# PluginInstanceFormat name
-# ExtraStats "cpu_util"
-#</Plugin>
-
LoadPlugin write_kafka
<Plugin write_kafka>
Property "metadata.broker.list" "$ves_kafka_host:$ves_kafka_port"
@@ -299,14 +273,61 @@ LoadPlugin write_kafka
EOF
fi
+ if [[ $(grep -c $ves_hostname /etc/hosts) -eq 0 ]]; then
+ log "add to /etc/hosts: $ves_kafka_host $ves_hostname"
+ echo "$ves_kafka_host $ves_hostname" | sudo tee -a /etc/hosts
+
log "restart collectd to apply updated config"
sudo systemctl restart collectd
+}
- log "start VES agent"
- cd ~/barometer/3rd_party/collectd-ves-app/ves_app
- nohup python ves_app.py \
- --events-schema=$ves_mode.yaml \
- --config=ves_app_config.conf > ~/ves_app.stdout.log 2>&1 &
+function setup_agent() {
+ log "setup VES agent"
+ if [[ ! -f /.dockerenv ]]; then
+ log "start the ves-agent container"
+ sudo docker run -it -d -v /tmp/ves:/opt/ves --name=ves-agent \
+ ubuntu:xenial /bin/bash
+ log "execute the agent setup script in the container"
+ sudo docker exec ves-agent /bin/bash /opt/ves/ves-setup.sh agent
+ else
+ common_prereqs
+ log "setup the VES environment"
+ source /opt/ves/ves_env.sh
+ log "install agent prerequisites"
+ pip install pyaml
+
+ setup_kafka_client
+
+ log "clone OPNFV Barometer"
+ rm -rf /opt/ves/barometer
+ git clone https://gerrit.opnfv.org/gerrit/barometer /opt/ves/barometer
+
+ log "setup ves_app_config.conf"
+ source /opt/ves/ves_env.sh
+ cd /opt/ves/barometer/3rd_party/collectd-ves-app/ves_app
+ cat <<EOF >ves_app_config.conf
+[config]
+Domain = $ves_host
+Port = $ves_port
+Path = $ves_path
+Topic = $ves_topic
+UseHttps = $ves_https
+Username = $ves_user
+Password = $ves_pass
+SendEventInterval = $ves_interval
+ApiVersion = $ves_version
+KafkaPort = $ves_kafka_port
+KafkaBroker = $ves_kafka_host
+EOF
+
+# log "add guest.yaml measurements to host.yaml (enables actual host data)"
+# tail --lines=+24 guest.yaml >>host.yaml
+
+ log "start VES agent"
+ echo "$ves_kafka_host $ves_hostname">>/etc/hosts
+ nohup python ves_app.py --events-schema=$ves_mode.yaml --loglevel ERROR \
+ --config=ves_app_config.conf > /opt/ves/ves_app.stdout.log 2>&1 &
+ fi
}
function setup_collector() {
@@ -314,29 +335,19 @@ function setup_collector() {
log "install prerequistes"
sudo apt-get install -y jq
+ ves_hostname=$HOSTNAME
+ export ves_hostname
ves_host=$(ip route get 8.8.8.8 | awk '{print $NF; exit}')
export ves_host
setup_env
- echo "cleanup any earlier install attempts"
- sudo docker stop influxdb
- sudo docker rm influxdb
- sudo docker stop grafana
- sudo docker rm grafana
- sudo docker stop ves-collector
- sudo docker rm -v ves-collector
- sudo rm -rf /tmp/ves
-
- log "clone OPNFV VES"
- git clone https://gerrit.opnfv.org/gerrit/ves /tmp/ves
-
log "setup influxdb container"
- sudo docker run -d --name=influxdb -p 8086:8086 influxdb
- status=$(sudo docker inspect influxdb | jq -r '.[0].State.Status')
+ sudo docker run -d --name=ves-influxdb -p 8086:8086 influxdb
+ status=$(sudo docker inspect ves-influxdb | jq -r '.[0].State.Status')
while [[ "x$status" != "xrunning" ]]; do
log "InfluxDB container state is ($status)"
sleep 10
- status=$(sudo docker inspect influxdb | jq -r '.[0].State.Status')
+ status=$(sudo docker inspect ves-influxdb | jq -r '.[0].State.Status')
done
log "InfluxDB container state is $status"
@@ -351,23 +362,23 @@ function setup_collector() {
--data-urlencode "q=CREATE DATABASE veseventsdb"
log "install Grafana container"
- sudo docker run -d --name grafana -p 3000:3000 grafana/grafana
- status=$(sudo docker inspect grafana | jq -r '.[0].State.Status')
+ sudo docker run -d --name ves-grafana -p 3001:3000 grafana/grafana
+ status=$(sudo docker inspect ves-grafana | jq -r '.[0].State.Status')
while [[ "x$status" != "xrunning" ]]; do
log "Grafana container state is ($status)"
sleep 10
- status=$(sudo docker inspect grafana | jq -r '.[0].State.Status')
+ status=$(sudo docker inspect ves-grafana | jq -r '.[0].State.Status')
done
log "Grafana container state is $status"
log "wait for Grafana API to be active"
- while ! curl http://$ves_host:3000 ; do
+ while ! curl http://$ves_host:3001 ; do
log "Grafana API is not yet responding... waiting 10 seconds"
sleep 10
done
log "add VESEvents datasource to Grafana"
- cat <<EOF >datasource.json
+ cat <<EOF >/tmp/ves/datasource.json
{ "name":"VESEvents",
"type":"influxdb",
"access":"direct",
@@ -385,18 +396,19 @@ function setup_collector() {
EOF
curl -H "Accept: application/json" -H "Content-type: application/json" \
- -X POST -d @datasource.json \
- http://admin:admin@$ves_host:3000/api/datasources
+ -X POST -d @/tmp/ves/datasource.json \
+ http://admin:admin@$ves_host:3001/api/datasources
log "add VES dashboard to Grafana"
curl -H "Accept: application/json" -H "Content-type: application/json" \
-X POST \
- -d @/tmp/ves/tests/onap-demo/blueprints/tosca-vnfd-onap-demo/Dashboard.json\
- http://admin:admin@$ves_host:3000/api/dashboards/db
+ -d @/tmp/ves/tools/grafana/Dashboard.json\
+ http://admin:admin@$ves_host:3001/api/dashboards/db
log "setup collector container"
cd /tmp/ves
touch monitor.log
+ rm -rf /tmp/ves/evel-test-collector
git clone https://github.com/att/evel-test-collector.git
sed -i -- \
"s~log_file = /var/log/att/collector.log~log_file = /opt/ves/collector.log~" \
@@ -411,8 +423,10 @@ EOF
evel-test-collector/config/collector.conf
sed -i -- "s~vel_topic_name = example_vnf~vel_topic_name = $ves_topic~g" \
evel-test-collector/config/collector.conf
+ sed -i -- "/vel_topic_name = /a influxdb = $ves_host" \
+ evel-test-collector/config/collector.conf
- cp tests/onap-demo/blueprints/tosca-vnfd-onap-demo/monitor.py \
+ cp /tmp/ves/tools/monitor.py \
evel-test-collector/code/collector/monitor.py
# Note below: python (2.7) is required due to dependency on module 'ConfigParser'
@@ -424,28 +438,46 @@ pip install requests
python /opt/ves/evel-test-collector/code/collector/monitor.py \
--config /opt/ves/evel-test-collector/config/collector.conf \
--influxdb $ves_host \
---section default > /opt/ves/monitor.log
+--section default > /opt/ves/monitor.log 2>&1 &
EOF
sudo docker run -it -d -v /tmp/ves:/opt/ves --name=ves-collector \
-p 30000:30000 ubuntu:xenial /bin/bash
- sudo docker exec -it -d ves-collector bash /opt/ves/setup-collector.sh
+ sudo docker exec ves-collector /bin/bash /opt/ves/setup-collector.sh
# debug hints
# sudo docker exec -it ves-collector apt-get install -y tcpdump
# sudo docker exec -it ves-collector tcpdump -A -v -s 0 -i any port 30000
# curl http://$ves_host:30000
# sudo docker exec -it ves-collector /bin/bash
- # ~/kafka_2.11-0.11.0.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic collectd
+ # ~/kafka_2.11-0.11.0.2/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic collectd
+}
+
+function clean() {
+ log "clean installation for $1 at $2"
+ if [[ "$1" == "master" ]]; then
+ cs="ves-agent ves-collector ves-grafana ves-influxdb"
+ for c in $cs; do
+ log "stop and remove container $c"
+ sudo docker stop $c
+ sudo docker rm -v $c
+ done
+ fi
+ log "remove collectd config for VES"
+ sudo sed -i -- '/VES plugin/,$d' /opt/collectd/etc/collectd.conf
+ sudo systemctl restart collectd
+ sudo rm -rf /tmp/ves
}
dist=`grep DISTRIB_ID /etc/*-release | awk -F '=' '{print $2}'`
-setup_env
if [[ $(grep -c $HOSTNAME /etc/hosts) -eq 0 ]]; then
echo "$(ip route get 8.8.8.8 | awk '{print $NF; exit}') $HOSTNAME" |\
sudo tee -a /etc/hosts
fi
case "$1" in
+ "collectd")
+ setup_collectd
+ ;;
"agent")
setup_agent
;;
@@ -455,6 +487,9 @@ case "$1" in
"kafka")
setup_kafka
;;
+ "clean")
+ clean $2 $3
+ ;;
*)
grep '#. ' $0
esac