From 6820172fcee7912f111c772ca11eb64c6b8a4342 Mon Sep 17 00:00:00 2001 From: Bryan Sullivan Date: Sun, 16 Oct 2016 21:55:48 -0700 Subject: Replaced collector.py with monitor.py JIRA: VES-1 Change-Id: Iacfa032786ebe9a2e475c858c582e92e820b277c Signed-off-by: Bryan Sullivan --- tests/blueprints/tosca-vnfd-hello-ves/monitor.py | 363 +++++++++++++++++++---- tests/vHello_VES.sh | 29 +- 2 files changed, 321 insertions(+), 71 deletions(-) diff --git a/tests/blueprints/tosca-vnfd-hello-ves/monitor.py b/tests/blueprints/tosca-vnfd-hello-ves/monitor.py index 372d586..5307435 100644 --- a/tests/blueprints/tosca-vnfd-hello-ves/monitor.py +++ b/tests/blueprints/tosca-vnfd-hello-ves/monitor.py @@ -19,74 +19,315 @@ # # Status: this is a work in progress, under test. +from wsgiref.simple_server import make_server, WSGIRequestHandler +import sys import os +import platform +import traceback import time -import sys +from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter +import ConfigParser +import logging.handlers +from base64 import b64decode +import string +import json +import jsonschema import select report_time = "" -request_rate = "" -app_state = "" -mode = "f" +requestRate = "" +vfStatus = "" +monitor_mode = "f" summary = "" status = "" +vfStatus = "" +base_url = '' +template_404 = b'''POST {0}''' +columns = 0 +rows = 0 + +class JSONObject: + def __init__(self, d): + self.__dict__ = d + +class NoLoggingWSGIRequestHandler(WSGIRequestHandler): + def log_message(self, format, *args): + pass def print_there(x, y, text): - sys.stdout.write("\x1b7\x1b[%d;%df%s\x1b8" % (x, y, text)) - sys.stdout.flush() - -a,b = os.popen('stty size', 'r').read().split() -columns = int(b) - -with open('/home/ubuntu/ves.log') as f: - while True: - if sys.stdin in select.select([sys.stdin], [], [], 0)[0]: - line = sys.stdin.readline() - if "f" in line: mode = "f" - if "c" in line: mode = "c" - # Update screen as the messed up the display! - print_there(1,columns-56,summary) - print_there(2,columns-56,status) - - line = f.readline() - if line: - if mode == "f": - print line, - - if "lastEpochMicrosec" in line: -#0....5....1....5....2....5....3....5....4....5....5 -# "lastEpochMicrosec": 1476552393091008, -# Note: the above is expected, but sometimes it's in a different position or -# corrupted with other output for some reason... - - fields = line.split( ) - e = fields[1][0:-1] - if e.isdigit(): -# print "report_time: ", e, "\n" - report_time = time.strftime('%Y-%m-%d %H:%M:%S', - time.localtime(int(e)/1000000)) - - if "requestRate" in line: -#....5....1....5....2....5....3....5 -# "requestRate": 2264, - request_rate = line[27:-2] - summary = report_time + " app state: " + app_state + ", request rate: " + request_rate - print_there(1,columns-56,summary) -#2016-10-16 17:15:29 app state: Started, request rate: 99 -#....5....1....5....2....5....3....5....4....5....5....5....6 - if mode == "c": print '{0} *** app state: {1}\trequest rate: {2}'.format( - report_time, app_state, request_rate) - - if "\"specificProblem\": \"Started\"" in line: - app_state = "Started" - status = report_time + " app state change: Started" - if mode == "c": print '{0} *** app state change: Started'.format(report_time) - - if "\"specificProblem\": \"Stopped\"" in line: - app_state = "Stopped" - status = report_time + " app state change: Stopped" - if mode == "c": print '{0} *** app state change: Stopped'.format(report_time) - - print_there(1,columns-56,summary) - print_there(2,columns-56,status) + sys.stdout.write("\x1b7\x1b[%d;%df%s\x1b8" % (x, y, text)) + sys.stdout.flush() + +base_url = '' +template_404 = b'''POST {0}''' + +def notfound_404(environ, start_response): + print('Unexpected URL/Method: {0} {1}'.format( + environ['REQUEST_METHOD'].upper(), + environ['PATH_INFO'])) + start_response('404 Not Found', [ ('Content-type', 'text/plain') ]) + return [template_404.format(base_url)] + +class PathDispatcher: + def __init__(self): + self.pathmap = { } + + def __call__(self, environ, start_response): + #---------------------------------------------------------------------- + # Extract the method and path from the environment. + #---------------------------------------------------------------------- + method = environ['REQUEST_METHOD'].lower() + path = environ['PATH_INFO'] + + #---------------------------------------------------------------------- + # See if we have a handler for this path, and if so invoke it. + # Otherwise, return a 404. + #---------------------------------------------------------------------- + handler = self.pathmap.get((method, path), notfound_404) + return handler(environ, start_response) + + def register(self, method, path, function): + print('Registering for {0} at {1}'.format(method, path)) + self.pathmap[method.lower(), path] = function + return function + +#-------------------------------------------------------------------------- +# Event processing +#-------------------------------------------------------------------------- +def process_event(e): + global status + global summary + global vfStatus + + epoch = e.event.commonEventHeader.lastEpochMicrosec + + report_time = time.strftime('%Y-%m-%d %H:%M:%S', + time.localtime(int(epoch)/1000000)) + + domain = e.event.commonEventHeader.domain + + if domain == 'measurementsForVfScaling': + + aggregateCpuUsage = e.event.measurementsForVfScaling.aggregateCpuUsage + requestRate = e.event.measurementsForVfScaling.requestRate + summary = report_time + " app state: " + vfStatus + ", request rate: " + str(requestRate) + if monitor_mode == "c": print '{0} *** app state: {1}\trequest rate: {2}'.format( + report_time, vfStatus, str(requestRate)) + + if domain == 'fault': + + alarmCondition = e.event.faultFields.alarmCondition + specificProblem = e.event.faultFields.specificProblem +# vfStatus = e.event.faultFields.vfStatus + vfStatus = e.event.faultFields.specificProblem + + status = report_time + " app state change: " + specificProblem + if monitor_mode == "c": print '{0} *** vfStatus change: {1}'.format(report_time, + specificProblem) + +# print_there only works if SSH'd to the VM manually - need to investigate +# print_there(1,columns-56,summary) + print '{0}'.format(summary) +# print_there(2,columns-56,status) + print '{0}'.format(status) + +#-------------------------------------------------------------------------- +# Main monitoring and logging procedure +#-------------------------------------------------------------------------- +def ves_monitor(environ, start_response): + + # Check for keyboard input + if sys.stdin in select.select([sys.stdin], [], [], 0)[0]: + line = sys.stdin.readline() + if "f" in line: monitor_mode = "f" + if "c" in line: monitor_mode = "c" + + print('==== ' + time.asctime() + ' ' + '=' * 49) + + #-------------------------------------------------------------------------- + # Extract the content from the request. + #-------------------------------------------------------------------------- + length = int(environ.get('CONTENT_LENGTH', '0')) + body = environ['wsgi.input'].read(length) + + mode, b64_credentials = string.split(environ.get('HTTP_AUTHORIZATION', + 'None None')) + if (b64_credentials != 'None'): + credentials = b64decode(b64_credentials) + else: + credentials = None + + #-------------------------------------------------------------------------- + # See whether the user authenticated themselves correctly. + #-------------------------------------------------------------------------- + if (credentials == (vel_username + ':' + vel_password)): + start_response('204 No Content', []) + yield '' + else: + print('Failed to authenticate agent') + start_response('401 Unauthorized', [ ('Content-type', + 'application/json')]) + req_error = { 'requestError': { + 'policyException': { + 'messageId': 'POL0001', + 'text': 'Failed to authenticate' + } + } + } + yield json.dumps(req_error) + + #-------------------------------------------------------------------------- + # Decode the JSON body + #-------------------------------------------------------------------------- + + try: + decoded_body = json.loads(body) + print('{0}'.format(json.dumps(decoded_body, + sort_keys=True, + indent=4, + separators=(',', ': ')))) + decoded_body = json.loads(body, object_hook=JSONObject) + process_event(decoded_body) + + except Exception as e: + print('JSON body is not valid for unexpected reason! {0}'.format(e)) + +def main(argv=None): + global columns + global rows + a,b = os.popen('stty size', 'r').read().split() + rows = int(a) + columns = int(b) + + if argv is None: + argv = sys.argv + else: + sys.argv.extend(argv) + + try: + #---------------------------------------------------------------------- + # Setup argument parser so we can parse the command-line. + #---------------------------------------------------------------------- + parser = ArgumentParser(description='', + formatter_class=ArgumentDefaultsHelpFormatter) + parser.add_argument('-v', '--verbose', + dest='verbose', + action='count', + help='set verbosity level') + parser.add_argument('-V', '--version', + action='version', + version='1.0', + help='Display version information') + parser.add_argument('-c', '--config', + dest='config', + default='/etc/opt/att/collector.conf', + help='Use this config file.', + metavar='') + parser.add_argument('-s', '--section', + dest='section', + default='default', + metavar='
', + help='section to use in the config file') + + #---------------------------------------------------------------------- + # Process arguments received. + #---------------------------------------------------------------------- + args = parser.parse_args() + verbose = args.verbose + config_file = args.config + config_section = args.section + #---------------------------------------------------------------------- + # Now read the config file, using command-line supplied values as + # overrides. + #---------------------------------------------------------------------- + defaults = {'log_file': 'ves.log', + 'vel_port': '30000', + 'vel_path': '', + 'vel_topic_name': '' + } + overrides = {} + config = ConfigParser.SafeConfigParser(defaults) + config.read(config_file) + + #---------------------------------------------------------------------- + # extract the values we want. + #---------------------------------------------------------------------- + log_file = config.get(config_section, 'log_file', vars=overrides) + vel_port = config.get(config_section, 'vel_port', vars=overrides) + vel_path = config.get(config_section, 'vel_path', vars=overrides) + vel_topic_name = config.get(config_section, + 'vel_topic_name', + vars=overrides) + global vel_username + global vel_password + vel_username = config.get(config_section, + 'vel_username', + vars=overrides) + vel_password = config.get(config_section, + 'vel_password', + vars=overrides) + vel_schema_file = config.get(config_section, + 'schema_file', + vars=overrides) + base_schema_file = config.get(config_section, + 'base_schema_file', + vars=overrides) + + #---------------------------------------------------------------------- + # Perform some basic error checking on the config. + #---------------------------------------------------------------------- + if (int(vel_port) < 1024 or int(vel_port) > 65535): + raise RuntimeError('Invalid Vendor Event Listener port ({0}) ' + 'specified'.format(vel_port)) + + if (len(vel_path) > 0 and vel_path[-1] != '/'): + vel_path += '/' + + #---------------------------------------------------------------------- + # Load up the vel_schema and base_schema, if they exist. + #---------------------------------------------------------------------- + if (os.path.exists(vel_schema_file)): + global vel_schema + vel_schema = json.load(open(vel_schema_file, 'r')) + if (os.path.exists(base_schema_file)): + base_schema = json.load(open(base_schema_file, 'r')) + vel_schema.update(base_schema) + + #---------------------------------------------------------------------- + # We are now ready to get started with processing. Start-up the various + # components of the system in order: + # + # 1) Create the dispatcher. + # 2) Register the functions for the URLs of interest. + # 3) Run the webserver. + #---------------------------------------------------------------------- + root_url = '/{0}eventListener/v{1}{2}'.format(vel_path, + '1', + '/' + vel_topic_name + if len(vel_topic_name) > 0 + else '') + + base_url = root_url + dispatcher = PathDispatcher() + dispatcher.register('GET', root_url, ves_monitor) + dispatcher.register('POST', root_url, ves_monitor) + httpd = make_server('', 30000, dispatcher, handler_class=NoLoggingWSGIRequestHandler) + httpd.serve_forever() + + return 0 + + except Exception as e: + #---------------------------------------------------------------------- + # Handle unexpected exceptions. + #---------------------------------------------------------------------- + indent = len('VES Monitor') * ' ' + sys.stderr.write('VES Monitor: ' + repr(e) + '\n') + sys.stderr.write(indent + ' for help use --help\n') + sys.stderr.write(traceback.format_exc()) + return 2 +#------------------------------------------------------------------------------ +# MAIN SCRIPT ENTRY POINT. +#------------------------------------------------------------------------------ +if __name__ == '__main__': + sys.exit(main()) diff --git a/tests/vHello_VES.sh b/tests/vHello_VES.sh index abb9687..6bc6bbf 100644 --- a/tests/vHello_VES.sh +++ b/tests/vHello_VES.sh @@ -22,7 +22,7 @@ # $ git clone https://gerrit.opnfv.org/gerrit/ves # $ cd ves/tests # $ bash vHello_VES.sh [setup|start|run|test|stop|clean] -# [collector|traffic|pause|nic] +# [monitor|traffic|pause|nic] # setup: setup test environment # start: install blueprint and run test # run: setup test environment and run test @@ -30,7 +30,7 @@ # stop: stop test and uninstall blueprint # clean: cleanup after test # Test: -# collector: attach to the collector VM and run the collector +# monitor: attach to the collector VM and run the VES Monitor # traffic: generate some traffic # pause: pause the VNF (web server) for a minute to generate a state change # nic: timed ifdown/ifup to generate a NIC fault report @@ -228,6 +228,8 @@ git clone https://github.com/att/evel-test-collector.git sed -i -- 's/vel_username = /vel_username = hello/' evel-test-collector/config/collector.conf sed -i -- 's/vel_password = /vel_password = world/' evel-test-collector/config/collector.conf EOF + # Replacing the default collector with monitor.py which has processing logic as well + scp -i /tmp/tacker/vHello.pem -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no /tmp/tacker/blueprints/tosca-vnfd-hello-ves/monitor.py ubuntu@$VDU2_IP:/home/ubuntu/monitor.py echo "$0: start vHello web server in VDU1" ssh -i /tmp/tacker/vHello.pem -x -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no ubuntu@$VDU1_IP "sudo chown ubuntu /home/ubuntu" @@ -270,19 +272,15 @@ get_vdu_ip () { vdu_ip=$(openstack server list | awk "/$1/ { print \$10 }") } -collector () { - echo "$0: Start the VES Collector in VDU2 - Stop first if running" +monitor () { + echo "$0: Start the VES Monitor in VDU2 - Stop first if running" get_vdu_ip VDU2 sudo cp /tmp/tacker/vHello.pem /tmp/vHello.pem sudo chown $USER:$USER /tmp/vHello.pem chmod 600 /tmp/vHello.pem - ssh -i /tmp/vHello.pem -x -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no ubuntu@$vdu_ip << 'EOF' + ssh -t -t -i /tmp/vHello.pem -x -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no ubuntu@$vdu_ip << 'EOF' sudo kill $(ps -ef | grep evel-test-collector | awk '{print $2}') -cd /home/ubuntu/ -nohup python evel-test-collector/code/collector/collector.py \ - --config evel-test-collector/config/collector.conf \ - --section default \ - --verbose >~/ves.log & +python monitor.py --config evel-test-collector/config/collector.conf --section default EOF } @@ -350,5 +348,16 @@ case "$1" in echo "run: setup test environment and run test" echo "stop: stop test and uninstall blueprint" echo "clean: cleanup after test" + echo "usage: bash vHello_VES.sh [setup|start|run|test|stop|clean] [monitor|traffic|pause|nic]" + echo "setup: setup test environment" + echo "start: install blueprint and run test" + echo "run: setup test environment and run test" + echo "test: run test tools/scenario - see below" + echo "stop: stop test and uninstall blueprint" + echo "clean: cleanup after test" + echo "Test:" + echo " monitor: attach to the collector VM and run the VES Monitor" + echo " traffic: generate some traffic" + echo " pause: pause the VNF (web server) for a minute to generate a state change" fail esac -- cgit 1.2.3-korg