summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Sullivan <bryan.sullivan@att.com>2016-10-16 21:55:48 -0700
committerBryan Sullivan <bryan.sullivan@att.com>2016-10-16 21:55:48 -0700
commit6820172fcee7912f111c772ca11eb64c6b8a4342 (patch)
tree15e892abba4806882399bcbd4ee6026fb825943e
parenta36ff3c3680c8eb5d8c291e7a6e237780ca34937 (diff)
Replaced collector.py with monitor.py
JIRA: VES-1 Change-Id: Iacfa032786ebe9a2e475c858c582e92e820b277c Signed-off-by: Bryan Sullivan <bryan.sullivan@att.com>
-rw-r--r--tests/blueprints/tosca-vnfd-hello-ves/monitor.py363
-rw-r--r--tests/vHello_VES.sh29
2 files changed, 321 insertions, 71 deletions
diff --git a/tests/blueprints/tosca-vnfd-hello-ves/monitor.py b/tests/blueprints/tosca-vnfd-hello-ves/monitor.py
index 372d586..5307435 100644
--- a/tests/blueprints/tosca-vnfd-hello-ves/monitor.py
+++ b/tests/blueprints/tosca-vnfd-hello-ves/monitor.py
@@ -19,74 +19,315 @@
#
# Status: this is a work in progress, under test.
+from wsgiref.simple_server import make_server, WSGIRequestHandler
+import sys
import os
+import platform
+import traceback
import time
-import sys
+from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
+import ConfigParser
+import logging.handlers
+from base64 import b64decode
+import string
+import json
+import jsonschema
import select
report_time = ""
-request_rate = ""
-app_state = ""
-mode = "f"
+requestRate = ""
+vfStatus = ""
+monitor_mode = "f"
summary = ""
status = ""
+vfStatus = ""
+base_url = ''
+template_404 = b'''POST {0}'''
+columns = 0
+rows = 0
+
+class JSONObject:
+ def __init__(self, d):
+ self.__dict__ = d
+
+class NoLoggingWSGIRequestHandler(WSGIRequestHandler):
+ def log_message(self, format, *args):
+ pass
def print_there(x, y, text):
- sys.stdout.write("\x1b7\x1b[%d;%df%s\x1b8" % (x, y, text))
- sys.stdout.flush()
-
-a,b = os.popen('stty size', 'r').read().split()
-columns = int(b)
-
-with open('/home/ubuntu/ves.log') as f:
- while True:
- if sys.stdin in select.select([sys.stdin], [], [], 0)[0]:
- line = sys.stdin.readline()
- if "f" in line: mode = "f"
- if "c" in line: mode = "c"
- # Update screen as the <cr> messed up the display!
- print_there(1,columns-56,summary)
- print_there(2,columns-56,status)
-
- line = f.readline()
- if line:
- if mode == "f":
- print line,
-
- if "lastEpochMicrosec" in line:
-#0....5....1....5....2....5....3....5....4....5....5
-# "lastEpochMicrosec": 1476552393091008,
-# Note: the above is expected, but sometimes it's in a different position or
-# corrupted with other output for some reason...
-
- fields = line.split( )
- e = fields[1][0:-1]
- if e.isdigit():
-# print "report_time: ", e, "\n"
- report_time = time.strftime('%Y-%m-%d %H:%M:%S',
- time.localtime(int(e)/1000000))
-
- if "requestRate" in line:
-#....5....1....5....2....5....3....5
-# "requestRate": 2264,
- request_rate = line[27:-2]
- summary = report_time + " app state: " + app_state + ", request rate: " + request_rate
- print_there(1,columns-56,summary)
-#2016-10-16 17:15:29 app state: Started, request rate: 99
-#....5....1....5....2....5....3....5....4....5....5....5....6
- if mode == "c": print '{0} *** app state: {1}\trequest rate: {2}'.format(
- report_time, app_state, request_rate)
-
- if "\"specificProblem\": \"Started\"" in line:
- app_state = "Started"
- status = report_time + " app state change: Started"
- if mode == "c": print '{0} *** app state change: Started'.format(report_time)
-
- if "\"specificProblem\": \"Stopped\"" in line:
- app_state = "Stopped"
- status = report_time + " app state change: Stopped"
- if mode == "c": print '{0} *** app state change: Stopped'.format(report_time)
-
- print_there(1,columns-56,summary)
- print_there(2,columns-56,status)
+ sys.stdout.write("\x1b7\x1b[%d;%df%s\x1b8" % (x, y, text))
+ sys.stdout.flush()
+
+base_url = ''
+template_404 = b'''POST {0}'''
+
+def notfound_404(environ, start_response):
+ print('Unexpected URL/Method: {0} {1}'.format(
+ environ['REQUEST_METHOD'].upper(),
+ environ['PATH_INFO']))
+ start_response('404 Not Found', [ ('Content-type', 'text/plain') ])
+ return [template_404.format(base_url)]
+
+class PathDispatcher:
+ def __init__(self):
+ self.pathmap = { }
+
+ def __call__(self, environ, start_response):
+ #----------------------------------------------------------------------
+ # Extract the method and path from the environment.
+ #----------------------------------------------------------------------
+ method = environ['REQUEST_METHOD'].lower()
+ path = environ['PATH_INFO']
+
+ #----------------------------------------------------------------------
+ # See if we have a handler for this path, and if so invoke it.
+ # Otherwise, return a 404.
+ #----------------------------------------------------------------------
+ handler = self.pathmap.get((method, path), notfound_404)
+ return handler(environ, start_response)
+
+ def register(self, method, path, function):
+ print('Registering for {0} at {1}'.format(method, path))
+ self.pathmap[method.lower(), path] = function
+ return function
+
+#--------------------------------------------------------------------------
+# Event processing
+#--------------------------------------------------------------------------
+def process_event(e):
+ global status
+ global summary
+ global vfStatus
+
+ epoch = e.event.commonEventHeader.lastEpochMicrosec
+
+ report_time = time.strftime('%Y-%m-%d %H:%M:%S',
+ time.localtime(int(epoch)/1000000))
+
+ domain = e.event.commonEventHeader.domain
+
+ if domain == 'measurementsForVfScaling':
+
+ aggregateCpuUsage = e.event.measurementsForVfScaling.aggregateCpuUsage
+ requestRate = e.event.measurementsForVfScaling.requestRate
+ summary = report_time + " app state: " + vfStatus + ", request rate: " + str(requestRate)
+ if monitor_mode == "c": print '{0} *** app state: {1}\trequest rate: {2}'.format(
+ report_time, vfStatus, str(requestRate))
+
+ if domain == 'fault':
+
+ alarmCondition = e.event.faultFields.alarmCondition
+ specificProblem = e.event.faultFields.specificProblem
+# vfStatus = e.event.faultFields.vfStatus
+ vfStatus = e.event.faultFields.specificProblem
+
+ status = report_time + " app state change: " + specificProblem
+ if monitor_mode == "c": print '{0} *** vfStatus change: {1}'.format(report_time,
+ specificProblem)
+
+# print_there only works if SSH'd to the VM manually - need to investigate
+# print_there(1,columns-56,summary)
+ print '{0}'.format(summary)
+# print_there(2,columns-56,status)
+ print '{0}'.format(status)
+
+#--------------------------------------------------------------------------
+# Main monitoring and logging procedure
+#--------------------------------------------------------------------------
+def ves_monitor(environ, start_response):
+
+ # Check for keyboard input
+ if sys.stdin in select.select([sys.stdin], [], [], 0)[0]:
+ line = sys.stdin.readline()
+ if "f" in line: monitor_mode = "f"
+ if "c" in line: monitor_mode = "c"
+
+ print('==== ' + time.asctime() + ' ' + '=' * 49)
+
+ #--------------------------------------------------------------------------
+ # Extract the content from the request.
+ #--------------------------------------------------------------------------
+ length = int(environ.get('CONTENT_LENGTH', '0'))
+ body = environ['wsgi.input'].read(length)
+
+ mode, b64_credentials = string.split(environ.get('HTTP_AUTHORIZATION',
+ 'None None'))
+ if (b64_credentials != 'None'):
+ credentials = b64decode(b64_credentials)
+ else:
+ credentials = None
+
+ #--------------------------------------------------------------------------
+ # See whether the user authenticated themselves correctly.
+ #--------------------------------------------------------------------------
+ if (credentials == (vel_username + ':' + vel_password)):
+ start_response('204 No Content', [])
+ yield ''
+ else:
+ print('Failed to authenticate agent')
+ start_response('401 Unauthorized', [ ('Content-type',
+ 'application/json')])
+ req_error = { 'requestError': {
+ 'policyException': {
+ 'messageId': 'POL0001',
+ 'text': 'Failed to authenticate'
+ }
+ }
+ }
+ yield json.dumps(req_error)
+
+ #--------------------------------------------------------------------------
+ # Decode the JSON body
+ #--------------------------------------------------------------------------
+
+ try:
+ decoded_body = json.loads(body)
+ print('{0}'.format(json.dumps(decoded_body,
+ sort_keys=True,
+ indent=4,
+ separators=(',', ': '))))
+ decoded_body = json.loads(body, object_hook=JSONObject)
+ process_event(decoded_body)
+
+ except Exception as e:
+ print('JSON body is not valid for unexpected reason! {0}'.format(e))
+
+def main(argv=None):
+ global columns
+ global rows
+ a,b = os.popen('stty size', 'r').read().split()
+ rows = int(a)
+ columns = int(b)
+
+ if argv is None:
+ argv = sys.argv
+ else:
+ sys.argv.extend(argv)
+
+ try:
+ #----------------------------------------------------------------------
+ # Setup argument parser so we can parse the command-line.
+ #----------------------------------------------------------------------
+ parser = ArgumentParser(description='',
+ formatter_class=ArgumentDefaultsHelpFormatter)
+ parser.add_argument('-v', '--verbose',
+ dest='verbose',
+ action='count',
+ help='set verbosity level')
+ parser.add_argument('-V', '--version',
+ action='version',
+ version='1.0',
+ help='Display version information')
+ parser.add_argument('-c', '--config',
+ dest='config',
+ default='/etc/opt/att/collector.conf',
+ help='Use this config file.',
+ metavar='<file>')
+ parser.add_argument('-s', '--section',
+ dest='section',
+ default='default',
+ metavar='<section>',
+ help='section to use in the config file')
+
+ #----------------------------------------------------------------------
+ # Process arguments received.
+ #----------------------------------------------------------------------
+ args = parser.parse_args()
+ verbose = args.verbose
+ config_file = args.config
+ config_section = args.section
+ #----------------------------------------------------------------------
+ # Now read the config file, using command-line supplied values as
+ # overrides.
+ #----------------------------------------------------------------------
+ defaults = {'log_file': 'ves.log',
+ 'vel_port': '30000',
+ 'vel_path': '',
+ 'vel_topic_name': ''
+ }
+ overrides = {}
+ config = ConfigParser.SafeConfigParser(defaults)
+ config.read(config_file)
+
+ #----------------------------------------------------------------------
+ # extract the values we want.
+ #----------------------------------------------------------------------
+ log_file = config.get(config_section, 'log_file', vars=overrides)
+ vel_port = config.get(config_section, 'vel_port', vars=overrides)
+ vel_path = config.get(config_section, 'vel_path', vars=overrides)
+ vel_topic_name = config.get(config_section,
+ 'vel_topic_name',
+ vars=overrides)
+ global vel_username
+ global vel_password
+ vel_username = config.get(config_section,
+ 'vel_username',
+ vars=overrides)
+ vel_password = config.get(config_section,
+ 'vel_password',
+ vars=overrides)
+ vel_schema_file = config.get(config_section,
+ 'schema_file',
+ vars=overrides)
+ base_schema_file = config.get(config_section,
+ 'base_schema_file',
+ vars=overrides)
+
+ #----------------------------------------------------------------------
+ # Perform some basic error checking on the config.
+ #----------------------------------------------------------------------
+ if (int(vel_port) < 1024 or int(vel_port) > 65535):
+ raise RuntimeError('Invalid Vendor Event Listener port ({0}) '
+ 'specified'.format(vel_port))
+
+ if (len(vel_path) > 0 and vel_path[-1] != '/'):
+ vel_path += '/'
+
+ #----------------------------------------------------------------------
+ # Load up the vel_schema and base_schema, if they exist.
+ #----------------------------------------------------------------------
+ if (os.path.exists(vel_schema_file)):
+ global vel_schema
+ vel_schema = json.load(open(vel_schema_file, 'r'))
+ if (os.path.exists(base_schema_file)):
+ base_schema = json.load(open(base_schema_file, 'r'))
+ vel_schema.update(base_schema)
+
+ #----------------------------------------------------------------------
+ # We are now ready to get started with processing. Start-up the various
+ # components of the system in order:
+ #
+ # 1) Create the dispatcher.
+ # 2) Register the functions for the URLs of interest.
+ # 3) Run the webserver.
+ #----------------------------------------------------------------------
+ root_url = '/{0}eventListener/v{1}{2}'.format(vel_path,
+ '1',
+ '/' + vel_topic_name
+ if len(vel_topic_name) > 0
+ else '')
+
+ base_url = root_url
+ dispatcher = PathDispatcher()
+ dispatcher.register('GET', root_url, ves_monitor)
+ dispatcher.register('POST', root_url, ves_monitor)
+ httpd = make_server('', 30000, dispatcher, handler_class=NoLoggingWSGIRequestHandler)
+ httpd.serve_forever()
+
+ return 0
+
+ except Exception as e:
+ #----------------------------------------------------------------------
+ # Handle unexpected exceptions.
+ #----------------------------------------------------------------------
+ indent = len('VES Monitor') * ' '
+ sys.stderr.write('VES Monitor: ' + repr(e) + '\n')
+ sys.stderr.write(indent + ' for help use --help\n')
+ sys.stderr.write(traceback.format_exc())
+ return 2
+#------------------------------------------------------------------------------
+# MAIN SCRIPT ENTRY POINT.
+#------------------------------------------------------------------------------
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/tests/vHello_VES.sh b/tests/vHello_VES.sh
index abb9687..6bc6bbf 100644
--- a/tests/vHello_VES.sh
+++ b/tests/vHello_VES.sh
@@ -22,7 +22,7 @@
# $ git clone https://gerrit.opnfv.org/gerrit/ves
# $ cd ves/tests
# $ bash vHello_VES.sh [setup|start|run|test|stop|clean]
-# [collector|traffic|pause|nic]
+# [monitor|traffic|pause|nic]
# setup: setup test environment
# start: install blueprint and run test
# run: setup test environment and run test
@@ -30,7 +30,7 @@
# stop: stop test and uninstall blueprint
# clean: cleanup after test
# Test:
-# collector: attach to the collector VM and run the collector
+# monitor: attach to the collector VM and run the VES Monitor
# traffic: generate some traffic
# pause: pause the VNF (web server) for a minute to generate a state change
# nic: timed ifdown/ifup to generate a NIC fault report
@@ -228,6 +228,8 @@ git clone https://github.com/att/evel-test-collector.git
sed -i -- 's/vel_username = /vel_username = hello/' evel-test-collector/config/collector.conf
sed -i -- 's/vel_password = /vel_password = world/' evel-test-collector/config/collector.conf
EOF
+ # Replacing the default collector with monitor.py which has processing logic as well
+ scp -i /tmp/tacker/vHello.pem -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no /tmp/tacker/blueprints/tosca-vnfd-hello-ves/monitor.py ubuntu@$VDU2_IP:/home/ubuntu/monitor.py
echo "$0: start vHello web server in VDU1"
ssh -i /tmp/tacker/vHello.pem -x -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no ubuntu@$VDU1_IP "sudo chown ubuntu /home/ubuntu"
@@ -270,19 +272,15 @@ get_vdu_ip () {
vdu_ip=$(openstack server list | awk "/$1/ { print \$10 }")
}
-collector () {
- echo "$0: Start the VES Collector in VDU2 - Stop first if running"
+monitor () {
+ echo "$0: Start the VES Monitor in VDU2 - Stop first if running"
get_vdu_ip VDU2
sudo cp /tmp/tacker/vHello.pem /tmp/vHello.pem
sudo chown $USER:$USER /tmp/vHello.pem
chmod 600 /tmp/vHello.pem
- ssh -i /tmp/vHello.pem -x -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no ubuntu@$vdu_ip << 'EOF'
+ ssh -t -t -i /tmp/vHello.pem -x -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no ubuntu@$vdu_ip << 'EOF'
sudo kill $(ps -ef | grep evel-test-collector | awk '{print $2}')
-cd /home/ubuntu/
-nohup python evel-test-collector/code/collector/collector.py \
- --config evel-test-collector/config/collector.conf \
- --section default \
- --verbose >~/ves.log &
+python monitor.py --config evel-test-collector/config/collector.conf --section default
EOF
}
@@ -350,5 +348,16 @@ case "$1" in
echo "run: setup test environment and run test"
echo "stop: stop test and uninstall blueprint"
echo "clean: cleanup after test"
+ echo "usage: bash vHello_VES.sh [setup|start|run|test|stop|clean] [monitor|traffic|pause|nic]"
+ echo "setup: setup test environment"
+ echo "start: install blueprint and run test"
+ echo "run: setup test environment and run test"
+ echo "test: run test tools/scenario - see below"
+ echo "stop: stop test and uninstall blueprint"
+ echo "clean: cleanup after test"
+ echo "Test:"
+ echo " monitor: attach to the collector VM and run the VES Monitor"
+ echo " traffic: generate some traffic"
+ echo " pause: pause the VNF (web server) for a minute to generate a state change"
fail
esac