summaryrefslogtreecommitdiffstats
path: root/tests/onap-demo/blueprints/tosca-vnfd-onap-demo/monitor.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/onap-demo/blueprints/tosca-vnfd-onap-demo/monitor.py')
-rw-r--r--tests/onap-demo/blueprints/tosca-vnfd-onap-demo/monitor.py168
1 files changed, 116 insertions, 52 deletions
diff --git a/tests/onap-demo/blueprints/tosca-vnfd-onap-demo/monitor.py b/tests/onap-demo/blueprints/tosca-vnfd-onap-demo/monitor.py
index 9b16af3..8784ca4 100644
--- a/tests/onap-demo/blueprints/tosca-vnfd-onap-demo/monitor.py
+++ b/tests/onap-demo/blueprints/tosca-vnfd-onap-demo/monitor.py
@@ -14,8 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-# What this is: Monitor and closed-loop policy agent as part of the OPNFV VES
-# vHello_VES demo.
+# What this is: Monitor and closed-loop policy agent as part of the OPNFV VES
+# ves_onap_demo.
#
# Status: this is a work in progress, under test.
@@ -34,9 +34,10 @@ import string
import json
import jsonschema
from functools import partial
+import requests
monitor_mode = "f"
-vdu_id = ['','','','']
+vdu_id = ['','','','','','']
summary_e = ['***** Summary of key stats *****','','','']
summary_c = ['Collectd agents:']
status = ['','Started','Started','Started']
@@ -49,27 +50,6 @@ class JSONObject:
def __init__(self, d):
self.__dict__ = d
-_hello_resp = '''\
-<html>
- <head>
- <title>Hello {name}</title>
- </head>
- <body>
- <h1>Hello {name}!</h1>
- </body>
-</html>'''
-
-_localtime_resp = '''\
-<?xml version="1.0"?>
-<time>
- <year>{t.tm_year}</year>
- <month>{t.tm_mon}</month>
- <day>{t.tm_mday}</day>
- <hour>{t.tm_hour}</hour>
- <minute>{t.tm_min}</minute>
- <second>{t.tm_sec}</second>
-</time>'''
-
__all__ = []
__version__ = 0.1
__date__ = '2015-12-04'
@@ -197,7 +177,7 @@ def listener(environ, start_response, schema):
#--------------------------------------------------------------------------
if (credentials == (vel_username + ':' + vel_password)):
logger.debug('Authenticated OK')
- print('Authenticated OK')
+# print('Authenticated OK')
#----------------------------------------------------------------------
# Respond to the caller. If we have a pending commandList from the
@@ -222,8 +202,9 @@ def listener(environ, start_response, schema):
start_response('202 Accepted', [])
yield ''
else:
- logger.warn('Failed to authenticate OK')
- print('Failed to authenticate agent credentials: ', credentials)
+ logger.warn('Failed to authenticate OK; creds: ' + credentials)
+ print('Failed to authenticate agent credentials: ', credentials,
+ 'against expected ', vel_username, ':', vel_password)
#----------------------------------------------------------------------
# Respond to the caller.
@@ -239,9 +220,90 @@ def listener(environ, start_response, schema):
}
yield json.dumps(req_error)
+ save_event(body)
process_event(body)
#--------------------------------------------------------------------------
+# Send event to influxdb
+#--------------------------------------------------------------------------
+def save_event(body):
+ jobj = json.loads(body)
+ e = json.loads(body, object_hook=JSONObject)
+
+ domain = jobj['event']['commonEventHeader']['domain']
+ timestamp = jobj['event']['commonEventHeader']['lastEpochMicrosec']
+ agent = jobj['event']['commonEventHeader']['reportingEntityName'].upper( )
+ if "VIRT" in agent:
+ agent = "computehost"
+ if "VDU1" in agent:
+ agent = "webserver_1"
+ if "VDU2" in agent:
+ agent = "webserver_2"
+ if "VDU3" in agent:
+ agent = "loadbalancer"
+ if "VDU4" in agent:
+ agent = "firewall"
+
+ if e.event.commonEventHeader.domain == "heartbeat":
+ print('Found Heartbeat')
+ pdata = 'heartbeat,system={} sequence={}'.format(agent,e.event.commonEventHeader.sequence)
+ print(pdata)
+ r = requests.post("http://localhost:8086/write?db=veseventsdb", data=pdata, headers={'Content-Type': 'text/plain'})
+ if r.status_code != 204:
+ print('*** Failed to add cpu event to influxdb ***')
+
+ if 'measurementsForVfScalingFields' in jobj['event']:
+ print('Found measurementsForVfScalingFields')
+ if 'cpuUsageArray' in jobj['event']['measurementsForVfScalingFields']:
+ if 'additionalFields' in jobj['event']['measurementsForVfScalingFields']:
+ for f in e.event.measurementsForVfScalingFields.additionalFields:
+ if f.name == "cpu-aggregation-cpu-average-user-percent-value":
+ aggregateCpuUsageUser = f.value
+ if f.name == "cpu-aggregation-cpu-average-system-percent-value":
+ aggregateCpuUsageSystem = f.value
+ if f.name == "cpu-aggregation-cpu-average-idle-percent-value":
+ aggregateCpuIdle = f.value
+ else:
+ a = e.event.measurementsForVfScalingFields.cpuUsageArray[0]
+ aggregateCpuUsageUser = a.cpuUsageUser
+ aggregateCpuUsageSystem = a.cpuUsageSystem
+ aggregateCpuIdle = a.cpuIdle
+
+ pdata = 'cpu,system={},cpuid={} cpuuser={},cpusystem={},cpuidle={}'.format(agent,jobj['event']['measurementsForVfScalingFields']['cpuUsageArray'][0]['cpuIdentifier'], aggregateCpuUsageUser,aggregateCpuUsageSystem,aggregateCpuIdle)
+ print(pdata)
+ r = requests.post("http://localhost:8086/write?db=veseventsdb", data=pdata, headers={'Content-Type': 'text/plain'})
+ if r.status_code != 204:
+ print('*** Failed to add cpu event to influxdb ***')
+
+ if 'vNicPerformanceArray' in jobj['event']['measurementsForVfScalingFields']:
+ print('Found vNicPerformanceArray')
+ vnicn = 1
+ for vnic in e.event.measurementsForVfScalingFields.vNicPerformanceArray:
+ pdata = 'vnic,system={},vnicn={},vnicid={} txoctets={},rxpacketsacc={},rxoctetsacc={},txpacketacc={}'.format(agent,vnicn,vnic.vNicIdentifier,vnic.transmittedOctetsAccumulated,vnic.receivedTotalPacketsAccumulated,vnic.receivedOctetsAccumulated,vnic.transmittedTotalPacketsAccumulated)
+ print(pdata)
+ r = requests.post("http://localhost:8086/write?db=veseventsdb", data=pdata, headers={'Content-Type': 'text/plain'})
+ if r.status_code != 204:
+ print('*** Failed to add vnic event to influxdb ***')
+ vnicn = vnicn + 1
+
+ if 'vNicUsageArray' in jobj['event']['measurementsForVfScalingFields']:
+ print('Found vNicUsageArray')
+ vnic = e.event.measurementsForVfScalingFields.vNicUsageArray[0]
+ pdata = 'vnic,system={},vnicid={} txoctets={},rxpacketsacc={},rxoctetsacc={},txpacketacc={}'.format(agent,vnic.vNicIdentifier,vnic.transmittedOctetsAccumulated,vnic.receivedTotalPacketsAccumulated,vnic.receivedOctetsAccumulated,vnic.transmittedTotalPacketsAccumulated)
+ print(pdata)
+ r = requests.post("http://localhost:8086/write?db=veseventsdb", data=pdata, headers={'Content-Type': 'text/plain'})
+ if r.status_code != 204:
+ print('*** Failed to add vnic event to influxdb ***')
+
+ if 'requestRate' in jobj['event']['measurementsForVfScalingFields']:
+ print('Found requestRate')
+ pdata = 'http,system={} httptxrx={}'.format(agent,e.event.measurementsForVfScalingFields.requestRate)
+ print(pdata)
+ r = requests.post("http://localhost:8086/write?db=veseventsdb", data=pdata, headers={'Content-Type': 'text/plain'})
+ if r.status_code != 204:
+ print('*** Failed to add http event to influxdb ***')
+
+#--------------------------------------------------------------------------
# Event processing
#--------------------------------------------------------------------------
def process_event(body):
@@ -256,42 +318,44 @@ def process_event(body):
epoch = e.event.commonEventHeader.lastEpochMicrosec
sourceId = e.event.commonEventHeader.sourceId
- report_time = time.strftime('%Y-%m-%d %H:%M:%S',
+ report_time = time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(int(epoch)/1000000))
- host = e.event.commonEventHeader.reportingEntityName
+ host = e.event.commonEventHeader.sourceName
if 'VDU1' in host or 'vdu1' in host: vdu = 1
if 'VDU2' in host or 'vdu2' in host: vdu = 2
if 'VDU3' in host or 'vdu3' in host: vdu = 3
-
+ if 'VDU4' in host or 'vdu4' in host: vdu = 4
+
domain = e.event.commonEventHeader.domain
if domain == 'measurementsForVfScaling':
- if vdu >= 1:
+ if vdu <= 2:
requestRate = e.event.measurementsForVfScalingFields.requestRate
summary_e[vdu] = host + ": state=" + status[vdu] + ", tps=" + str(requestRate)
- else:
- aggregateCpuUsage = e.event.measurementsForVfScalingFields.aggregateCpuUsage
- vNicUsageArray = e.event.measurementsForVfScalingFields.vNicUsageArray
- s = ""
- for i in range(1,len(vdu_id)):
- if sourceId.upper() in vdu_id[i].upper():
- s = "(VDU"+ str(i) + ") "
- if s:
- s += host + ": cpu=" + str(aggregateCpuUsage)
- found = False
- for i in range(1,len(summary_c)):
- if host in summary_c[i]:
- summary_c[i] = s
- found = True
- break
- if not found:
- summary_c.extend([s])
+# else:
+# for f in e.event.measurementsForVfScalingFields.additionalFields:
+# if f.name == "cpu-aggregation-cpu-average-idle-percent-value":
+# aggregateCpuUsage = 100 - float(f.value)
+# break
+# s = ""
+# for i in range(1,len(vdu_id)):
+# if sourceName.upper() in vdu_id[i].upper():
+# s = " (VDU"+ str(i) + ") "
+# s = host + ": cpu=" + aggregateCpuUsage
+# found = False
+# for c in summary_c:
+# if host in c:
+# c = s
+# found = True
+# break
+# if not found:
+# summary_c.append(s)
for s in summary_e:
print '{0}'.format(s)
- for s in summary_c:
- print '{0}'.format(s)
+# for s in summary_c:
+# print '{0}'.format(s)
if domain == 'fault' and vdu >= 1:
alarmCondition = e.event.faultFields.alarmCondition
@@ -448,7 +512,7 @@ USAGE
help='Display version information')
parser.add_argument('-a', '--api-version',
dest='api_version',
- default='3',
+ default='5',
help='set API version')
parser.add_argument('-c', '--config',
dest='config',
@@ -518,7 +582,7 @@ USAGE
#----------------------------------------------------------------------
global logger
print('Logfile: {0}'.format(log_file))
- logger = logging.getLogger('collector')
+ logger = logging.getLogger('monitor')
if verbose > 0:
print('Verbose mode on')
logger.setLevel(logging.DEBUG)