summaryrefslogtreecommitdiffstats
path: root/tests/blueprints/tosca-vnfd-hello-ves
diff options
context:
space:
mode:
Diffstat (limited to 'tests/blueprints/tosca-vnfd-hello-ves')
-rw-r--r--tests/blueprints/tosca-vnfd-hello-ves/monitor.py363
1 files changed, 302 insertions, 61 deletions
diff --git a/tests/blueprints/tosca-vnfd-hello-ves/monitor.py b/tests/blueprints/tosca-vnfd-hello-ves/monitor.py
index 372d586..5307435 100644
--- a/tests/blueprints/tosca-vnfd-hello-ves/monitor.py
+++ b/tests/blueprints/tosca-vnfd-hello-ves/monitor.py
@@ -19,74 +19,315 @@
#
# Status: this is a work in progress, under test.
+from wsgiref.simple_server import make_server, WSGIRequestHandler
+import sys
import os
+import platform
+import traceback
import time
-import sys
+from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
+import ConfigParser
+import logging.handlers
+from base64 import b64decode
+import string
+import json
+import jsonschema
import select
report_time = ""
-request_rate = ""
-app_state = ""
-mode = "f"
+requestRate = ""
+vfStatus = ""
+monitor_mode = "f"
summary = ""
status = ""
+vfStatus = ""
+base_url = ''
+template_404 = b'''POST {0}'''
+columns = 0
+rows = 0
+
+class JSONObject:
+ def __init__(self, d):
+ self.__dict__ = d
+
+class NoLoggingWSGIRequestHandler(WSGIRequestHandler):
+ def log_message(self, format, *args):
+ pass
def print_there(x, y, text):
- sys.stdout.write("\x1b7\x1b[%d;%df%s\x1b8" % (x, y, text))
- sys.stdout.flush()
-
-a,b = os.popen('stty size', 'r').read().split()
-columns = int(b)
-
-with open('/home/ubuntu/ves.log') as f:
- while True:
- if sys.stdin in select.select([sys.stdin], [], [], 0)[0]:
- line = sys.stdin.readline()
- if "f" in line: mode = "f"
- if "c" in line: mode = "c"
- # Update screen as the <cr> messed up the display!
- print_there(1,columns-56,summary)
- print_there(2,columns-56,status)
-
- line = f.readline()
- if line:
- if mode == "f":
- print line,
-
- if "lastEpochMicrosec" in line:
-#0....5....1....5....2....5....3....5....4....5....5
-# "lastEpochMicrosec": 1476552393091008,
-# Note: the above is expected, but sometimes it's in a different position or
-# corrupted with other output for some reason...
-
- fields = line.split( )
- e = fields[1][0:-1]
- if e.isdigit():
-# print "report_time: ", e, "\n"
- report_time = time.strftime('%Y-%m-%d %H:%M:%S',
- time.localtime(int(e)/1000000))
-
- if "requestRate" in line:
-#....5....1....5....2....5....3....5
-# "requestRate": 2264,
- request_rate = line[27:-2]
- summary = report_time + " app state: " + app_state + ", request rate: " + request_rate
- print_there(1,columns-56,summary)
-#2016-10-16 17:15:29 app state: Started, request rate: 99
-#....5....1....5....2....5....3....5....4....5....5....5....6
- if mode == "c": print '{0} *** app state: {1}\trequest rate: {2}'.format(
- report_time, app_state, request_rate)
-
- if "\"specificProblem\": \"Started\"" in line:
- app_state = "Started"
- status = report_time + " app state change: Started"
- if mode == "c": print '{0} *** app state change: Started'.format(report_time)
-
- if "\"specificProblem\": \"Stopped\"" in line:
- app_state = "Stopped"
- status = report_time + " app state change: Stopped"
- if mode == "c": print '{0} *** app state change: Stopped'.format(report_time)
-
- print_there(1,columns-56,summary)
- print_there(2,columns-56,status)
+ sys.stdout.write("\x1b7\x1b[%d;%df%s\x1b8" % (x, y, text))
+ sys.stdout.flush()
+
+base_url = ''
+template_404 = b'''POST {0}'''
+
+def notfound_404(environ, start_response):
+ print('Unexpected URL/Method: {0} {1}'.format(
+ environ['REQUEST_METHOD'].upper(),
+ environ['PATH_INFO']))
+ start_response('404 Not Found', [ ('Content-type', 'text/plain') ])
+ return [template_404.format(base_url)]
+
+class PathDispatcher:
+ def __init__(self):
+ self.pathmap = { }
+
+ def __call__(self, environ, start_response):
+ #----------------------------------------------------------------------
+ # Extract the method and path from the environment.
+ #----------------------------------------------------------------------
+ method = environ['REQUEST_METHOD'].lower()
+ path = environ['PATH_INFO']
+
+ #----------------------------------------------------------------------
+ # See if we have a handler for this path, and if so invoke it.
+ # Otherwise, return a 404.
+ #----------------------------------------------------------------------
+ handler = self.pathmap.get((method, path), notfound_404)
+ return handler(environ, start_response)
+
+ def register(self, method, path, function):
+ print('Registering for {0} at {1}'.format(method, path))
+ self.pathmap[method.lower(), path] = function
+ return function
+
+#--------------------------------------------------------------------------
+# Event processing
+#--------------------------------------------------------------------------
+def process_event(e):
+ global status
+ global summary
+ global vfStatus
+
+ epoch = e.event.commonEventHeader.lastEpochMicrosec
+
+ report_time = time.strftime('%Y-%m-%d %H:%M:%S',
+ time.localtime(int(epoch)/1000000))
+
+ domain = e.event.commonEventHeader.domain
+
+ if domain == 'measurementsForVfScaling':
+
+ aggregateCpuUsage = e.event.measurementsForVfScaling.aggregateCpuUsage
+ requestRate = e.event.measurementsForVfScaling.requestRate
+ summary = report_time + " app state: " + vfStatus + ", request rate: " + str(requestRate)
+ if monitor_mode == "c": print '{0} *** app state: {1}\trequest rate: {2}'.format(
+ report_time, vfStatus, str(requestRate))
+
+ if domain == 'fault':
+
+ alarmCondition = e.event.faultFields.alarmCondition
+ specificProblem = e.event.faultFields.specificProblem
+# vfStatus = e.event.faultFields.vfStatus
+ vfStatus = e.event.faultFields.specificProblem
+
+ status = report_time + " app state change: " + specificProblem
+ if monitor_mode == "c": print '{0} *** vfStatus change: {1}'.format(report_time,
+ specificProblem)
+
+# print_there only works if SSH'd to the VM manually - need to investigate
+# print_there(1,columns-56,summary)
+ print '{0}'.format(summary)
+# print_there(2,columns-56,status)
+ print '{0}'.format(status)
+
+#--------------------------------------------------------------------------
+# Main monitoring and logging procedure
+#--------------------------------------------------------------------------
+def ves_monitor(environ, start_response):
+
+ # Check for keyboard input
+ if sys.stdin in select.select([sys.stdin], [], [], 0)[0]:
+ line = sys.stdin.readline()
+ if "f" in line: monitor_mode = "f"
+ if "c" in line: monitor_mode = "c"
+
+ print('==== ' + time.asctime() + ' ' + '=' * 49)
+
+ #--------------------------------------------------------------------------
+ # Extract the content from the request.
+ #--------------------------------------------------------------------------
+ length = int(environ.get('CONTENT_LENGTH', '0'))
+ body = environ['wsgi.input'].read(length)
+
+ mode, b64_credentials = string.split(environ.get('HTTP_AUTHORIZATION',
+ 'None None'))
+ if (b64_credentials != 'None'):
+ credentials = b64decode(b64_credentials)
+ else:
+ credentials = None
+
+ #--------------------------------------------------------------------------
+ # See whether the user authenticated themselves correctly.
+ #--------------------------------------------------------------------------
+ if (credentials == (vel_username + ':' + vel_password)):
+ start_response('204 No Content', [])
+ yield ''
+ else:
+ print('Failed to authenticate agent')
+ start_response('401 Unauthorized', [ ('Content-type',
+ 'application/json')])
+ req_error = { 'requestError': {
+ 'policyException': {
+ 'messageId': 'POL0001',
+ 'text': 'Failed to authenticate'
+ }
+ }
+ }
+ yield json.dumps(req_error)
+
+ #--------------------------------------------------------------------------
+ # Decode the JSON body
+ #--------------------------------------------------------------------------
+
+ try:
+ decoded_body = json.loads(body)
+ print('{0}'.format(json.dumps(decoded_body,
+ sort_keys=True,
+ indent=4,
+ separators=(',', ': '))))
+ decoded_body = json.loads(body, object_hook=JSONObject)
+ process_event(decoded_body)
+
+ except Exception as e:
+ print('JSON body is not valid for unexpected reason! {0}'.format(e))
+
+def main(argv=None):
+ global columns
+ global rows
+ a,b = os.popen('stty size', 'r').read().split()
+ rows = int(a)
+ columns = int(b)
+
+ if argv is None:
+ argv = sys.argv
+ else:
+ sys.argv.extend(argv)
+
+ try:
+ #----------------------------------------------------------------------
+ # Setup argument parser so we can parse the command-line.
+ #----------------------------------------------------------------------
+ parser = ArgumentParser(description='',
+ formatter_class=ArgumentDefaultsHelpFormatter)
+ parser.add_argument('-v', '--verbose',
+ dest='verbose',
+ action='count',
+ help='set verbosity level')
+ parser.add_argument('-V', '--version',
+ action='version',
+ version='1.0',
+ help='Display version information')
+ parser.add_argument('-c', '--config',
+ dest='config',
+ default='/etc/opt/att/collector.conf',
+ help='Use this config file.',
+ metavar='<file>')
+ parser.add_argument('-s', '--section',
+ dest='section',
+ default='default',
+ metavar='<section>',
+ help='section to use in the config file')
+
+ #----------------------------------------------------------------------
+ # Process arguments received.
+ #----------------------------------------------------------------------
+ args = parser.parse_args()
+ verbose = args.verbose
+ config_file = args.config
+ config_section = args.section
+ #----------------------------------------------------------------------
+ # Now read the config file, using command-line supplied values as
+ # overrides.
+ #----------------------------------------------------------------------
+ defaults = {'log_file': 'ves.log',
+ 'vel_port': '30000',
+ 'vel_path': '',
+ 'vel_topic_name': ''
+ }
+ overrides = {}
+ config = ConfigParser.SafeConfigParser(defaults)
+ config.read(config_file)
+
+ #----------------------------------------------------------------------
+ # extract the values we want.
+ #----------------------------------------------------------------------
+ log_file = config.get(config_section, 'log_file', vars=overrides)
+ vel_port = config.get(config_section, 'vel_port', vars=overrides)
+ vel_path = config.get(config_section, 'vel_path', vars=overrides)
+ vel_topic_name = config.get(config_section,
+ 'vel_topic_name',
+ vars=overrides)
+ global vel_username
+ global vel_password
+ vel_username = config.get(config_section,
+ 'vel_username',
+ vars=overrides)
+ vel_password = config.get(config_section,
+ 'vel_password',
+ vars=overrides)
+ vel_schema_file = config.get(config_section,
+ 'schema_file',
+ vars=overrides)
+ base_schema_file = config.get(config_section,
+ 'base_schema_file',
+ vars=overrides)
+
+ #----------------------------------------------------------------------
+ # Perform some basic error checking on the config.
+ #----------------------------------------------------------------------
+ if (int(vel_port) < 1024 or int(vel_port) > 65535):
+ raise RuntimeError('Invalid Vendor Event Listener port ({0}) '
+ 'specified'.format(vel_port))
+
+ if (len(vel_path) > 0 and vel_path[-1] != '/'):
+ vel_path += '/'
+
+ #----------------------------------------------------------------------
+ # Load up the vel_schema and base_schema, if they exist.
+ #----------------------------------------------------------------------
+ if (os.path.exists(vel_schema_file)):
+ global vel_schema
+ vel_schema = json.load(open(vel_schema_file, 'r'))
+ if (os.path.exists(base_schema_file)):
+ base_schema = json.load(open(base_schema_file, 'r'))
+ vel_schema.update(base_schema)
+
+ #----------------------------------------------------------------------
+ # We are now ready to get started with processing. Start-up the various
+ # components of the system in order:
+ #
+ # 1) Create the dispatcher.
+ # 2) Register the functions for the URLs of interest.
+ # 3) Run the webserver.
+ #----------------------------------------------------------------------
+ root_url = '/{0}eventListener/v{1}{2}'.format(vel_path,
+ '1',
+ '/' + vel_topic_name
+ if len(vel_topic_name) > 0
+ else '')
+
+ base_url = root_url
+ dispatcher = PathDispatcher()
+ dispatcher.register('GET', root_url, ves_monitor)
+ dispatcher.register('POST', root_url, ves_monitor)
+ httpd = make_server('', 30000, dispatcher, handler_class=NoLoggingWSGIRequestHandler)
+ httpd.serve_forever()
+
+ return 0
+
+ except Exception as e:
+ #----------------------------------------------------------------------
+ # Handle unexpected exceptions.
+ #----------------------------------------------------------------------
+ indent = len('VES Monitor') * ' '
+ sys.stderr.write('VES Monitor: ' + repr(e) + '\n')
+ sys.stderr.write(indent + ' for help use --help\n')
+ sys.stderr.write(traceback.format_exc())
+ return 2
+#------------------------------------------------------------------------------
+# MAIN SCRIPT ENTRY POINT.
+#------------------------------------------------------------------------------
+if __name__ == '__main__':
+ sys.exit(main())