From 00410ac4f2b167af4d6cd01072c791ac6804ce5a Mon Sep 17 00:00:00 2001 From: Kerim Gokarslan Date: Thu, 31 Aug 2017 12:39:07 -0700 Subject: NFVBENCH-12 Add run summary to fluentd stream NFVBENCH-14 Fluentd server IP adress from the config is not picked up when configured NFVBENCH-16 At the start of each run send a new fluent record with the config Change-Id: I2d76ecee5b1b93dad4eeccd67d5ed906a0a8faba Signed-off-by: Kerim Gokarslan --- nfvbench/fluentd.py | 61 +++++++++++++++++++++++++++++++++++++++++++++++++-- nfvbench/nfvbench.py | 22 +++++++++++++++---- nfvbench/nfvbenchd.py | 9 ++++++-- 3 files changed, 84 insertions(+), 8 deletions(-) (limited to 'nfvbench') diff --git a/nfvbench/fluentd.py b/nfvbench/fluentd.py index 683d4ce..5bfcebe 100644 --- a/nfvbench/fluentd.py +++ b/nfvbench/fluentd.py @@ -16,6 +16,7 @@ from datetime import datetime from fluent import sender import logging + class FluentLogHandler(logging.Handler): '''This is a minimalist log handler for use with Fluentd @@ -26,17 +27,25 @@ class FluentLogHandler(logging.Handler): - the runlogdate (to tie multiple run-related logs together) The timestamp is retrieved by the fluentd library. ''' + def __init__(self, tag, fluentd_ip='127.0.0.1', fluentd_port=24224): logging.Handler.__init__(self) self.tag = tag self.formatter = logging.Formatter('%(message)s') - self.sender = sender.FluentSender(self.tag, port=fluentd_port) - self.start_new_run() + self.sender = sender.FluentSender(self.tag, host=fluentd_ip, port=fluentd_port) + self.runlogdate = 0 + self.__warning_counter = 0 + self.__error_counter = 0 def start_new_run(self): '''Delimitate a new run in the stream of records with a new timestamp ''' self.runlogdate = str(datetime.now()) + # reset counters + self.__warning_counter = 0 + self.__error_counter = 0 + # send start record + self.__send_start_record() def emit(self, record): data = { @@ -44,4 +53,52 @@ class FluentLogHandler(logging.Handler): "loglevel": record.levelname, "message": self.formatter.format(record) } + self.__update_stats(record.levelno) self.sender.emit(None, data) + + # send START record for each run + def __send_start_record(self): + data = { + "runlogdate": self.runlogdate, + "loglevel": "START", + "message": "NFVBENCH run is started", + "numloglevel": 0, + "numerrors": 0, + "numwarnings": 0 + } + self.sender.emit(None, data) + + # send stats related to the current run and reset state for a new run + def send_run_summary(self, run_summary_required): + if run_summary_required or self.__get_highest_level() == logging.ERROR: + data = { + "runlogdate": self.runlogdate, + "loglevel": "RUN_SUMMARY", + "message": self.__get_highest_level_desc(), + "numloglevel": self.__get_highest_level(), + "numerrors": self.__error_counter, + "numwarnings": self.__warning_counter + } + self.sender.emit(None, data) + + def __get_highest_level(self): + if self.__error_counter > 0: + return logging.ERROR + elif self.__warning_counter > 0: + return logging.WARNING + return logging.INFO + + def __get_highest_level_desc(self): + highest_level = self.__get_highest_level() + if highest_level == logging.INFO: + return "GOOD RUN" + elif highest_level == logging.WARNING: + return "RUN WITH WARNINGS" + else: + return "RUN WITH ERRORS" + + def __update_stats(self, levelno): + if levelno == logging.WARNING: + self.__warning_counter += 1 + elif levelno == logging.ERROR: + self.__error_counter += 1 diff --git a/nfvbench/nfvbench.py b/nfvbench/nfvbench.py index b36d328..37645aa 100644 --- a/nfvbench/nfvbench.py +++ b/nfvbench/nfvbench.py @@ -44,6 +44,7 @@ import utils fluent_logger = None + class NFVBench(object): """Main class of NFV benchmarking tool.""" STATUS_OK = 'OK' @@ -75,7 +76,7 @@ class NFVBench(object): def set_notifier(self, notifier): self.notifier = notifier - def run(self, opts): + def run(self, opts, args): status = NFVBench.STATUS_OK result = None message = '' @@ -83,6 +84,7 @@ class NFVBench(object): # take a snapshot of the current time for this new run # so that all subsequent logs can relate to this run fluent_logger.start_new_run() + LOG.info(args) try: self.update_config(opts) self.setup() @@ -407,6 +409,7 @@ def override_custom_traffic(config, frame_sizes, unidir): "profile": traffic_profile_name } + def check_physnet(name, netattrs): if not netattrs.physical_network: raise Exception("SRIOV requires physical_network to be specified for the {n} network" @@ -415,8 +418,10 @@ def check_physnet(name, netattrs): raise Exception("SRIOV requires segmentation_id to be specified for the {n} network" .format(n=name)) + def main(): global fluent_logger + run_summary_required = False try: log.setup() # load default config file @@ -508,7 +513,7 @@ def main(): if opts.server: if os.path.isdir(opts.server): - server = WebSocketIoServer(opts.server, nfvbench) + server = WebSocketIoServer(opts.server, nfvbench, fluent_logger) nfvbench.set_notifier(server) try: port = int(opts.port) @@ -521,6 +526,7 @@ def main(): sys.exit(1) else: with utils.RunLock(): + run_summary_required = True if unknown_opts: err_msg = 'Unknown options: ' + ' '.join(unknown_opts) LOG.error(err_msg) @@ -528,7 +534,9 @@ def main(): # remove unfilled values opts = {k: v for k, v in vars(opts).iteritems() if v is not None} - result = nfvbench.run(opts) + # get CLI args + params = ' '.join(str(e) for e in sys.argv[1:]) + result = nfvbench.run(opts, params) if 'error_message' in result: raise Exception(result['error_message']) @@ -536,12 +544,18 @@ def main(): nfvbench.save(result['result']) nfvbench.print_summary(result['result']) except Exception as exc: + run_summary_required = True LOG.error({ 'status': NFVBench.STATUS_ERROR, 'error_message': traceback.format_exc() }) print str(exc) - sys.exit(1) + finally: + if fluent_logger: + # only send a summary record if there was an actual nfvbench run or + # if an error/exception was logged. + fluent_logger.send_run_summary(run_summary_required) + if __name__ == '__main__': main() diff --git a/nfvbench/nfvbenchd.py b/nfvbench/nfvbenchd.py index aef896a..4bbd69d 100644 --- a/nfvbench/nfvbenchd.py +++ b/nfvbench/nfvbenchd.py @@ -23,6 +23,7 @@ from flask_socketio import emit from flask_socketio import SocketIO import json +from log import LOG import Queue import traceback from utils import byteify @@ -206,9 +207,10 @@ class WebSocketIoServer(object): notifications using websocket events (send_ methods). Caller should simply create an instance of this class and pass a runner object then invoke the run method """ - def __init__(self, http_root, runner): + def __init__(self, http_root, runner, logger): self.nfvbench_runner = runner setup_flask(http_root) + self.fluent_logger = logger def run(self, host='127.0.0.1', port=7556): @@ -219,6 +221,7 @@ class WebSocketIoServer(object): # wait for run requests # the runner must be executed from the main thread (Trex client library requirement) while True: + # print 'main thread waiting for requests...' config = Ctx.dequeue() # print 'main thread processing request...' @@ -227,11 +230,12 @@ class WebSocketIoServer(object): # remove unfilled values as we do not want them to override default values with None config = {k: v for k, v in config.items() if v is not None} with RunLock(): - results = self.nfvbench_runner.run(config) + results = self.nfvbench_runner.run(config, config) except Exception as exc: print 'NFVbench runner exception:' traceback.print_exc() results = result_json(STATUS_ERROR, str(exc)) + LOG.exception() if Ctx.request_from_socketio: socketio.emit('run_end', results) @@ -239,6 +243,7 @@ class WebSocketIoServer(object): # this might overwrite a previously unfetched result Ctx.set_result(results) Ctx.release() + self.fluent_logger.send_run_summary(True) def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct): stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct} -- cgit 1.2.3-korg