From 152d3f49f8208adc1d0b37d41356163ca65fe0ca Mon Sep 17 00:00:00 2001 From: Kerim Gokarslan Date: Tue, 17 Oct 2017 15:17:44 -0700 Subject: NFVBENCH-42 Add multiple fluentd aggregators support Change-Id: I5b752f9ad4f7b4a60f2678d22467db570e02ab82 Signed-off-by: Kerim Gokarslan --- docs/testing/user/userguide/fluentd.rst | 2 +- nfvbench/cfg.default.yaml | 36 +++++++++++++++++++------------- nfvbench/fluentd.py | 36 ++++++++++++++++++++++---------- nfvbench/nfvbench.py | 37 ++++++++++----------------------- nfvbench/nfvbenchd.py | 18 +++++----------- test/test_nfvbench.py | 35 ++++++++++++++++++++++++++++++- 6 files changed, 98 insertions(+), 66 deletions(-) diff --git a/docs/testing/user/userguide/fluentd.rst b/docs/testing/user/userguide/fluentd.rst index 465d0a3..889d598 100644 --- a/docs/testing/user/userguide/fluentd.rst +++ b/docs/testing/user/userguide/fluentd.rst @@ -187,4 +187,4 @@ And the results of this command obtained from fluentd by elasticsearch: "sort": [ 1508264203755 ] - } \ No newline at end of file + } diff --git a/nfvbench/cfg.default.yaml b/nfvbench/cfg.default.yaml index c26991c..2892d11 100644 --- a/nfvbench/cfg.default.yaml +++ b/nfvbench/cfg.default.yaml @@ -367,24 +367,32 @@ debug: false # Defaults to disabled log_file: -# When enabled, all logs will be sent to a fluentd server at the requested IP and port -# The fluentd "tag" and "label" fields for every message will be set to "nfvbench" +# When enabled, all results and/or logs will be sent to a fluentd servers at the requested IPs and ports +# A list of one or more fluentd servers identified by their IPs and port numbers should be given. +# For each recipient it is possible to enable both sending logs and performance +# results, or enable either logs or performance results. For enabling logs or results logging_tag or +# result_tag should be set. + fluentd: - # by default (logging_tag is empty) nfvbench log messages are not sent to fluentd - # to enable logging to fluents, specify a valid fluentd tag name to be used for the - # log records - logging_tag: + # by default (logging_tag is empty) nfvbench log messages are not sent to fluentd + # to enable logging to fluents, specify a valid fluentd tag name to be used for the + # log records + - logging_tag: + + # by default (result_tag is empty) nfvbench results are not sent to fluentd + # to enable sending nfvbench results to fluentd, specify a valid fluentd tag name + # to be used for the results records, which is different than logging_tag + result_tag: - # by default (result_tag is empty) nfvbench results are not sent to fluentd - # to enable sending nfvbench results to fluentd, specify a valid fluentd tag name - # to be used for the results records, which is different than logging_tag - result_tag: + # IP address of the server, defaults to loopback + ip: 127.0.0.1 - # IP address of the server, defaults to loopback - ip: 127.0.0.1 + # port # to use, by default, use the default fluentd forward port + port: 24224 - # port # to use, by default, use the default fluentd forward port - port: 24224 + # by default (logging_tag is empty) nfvbench log messages are not sent to fluentd + # to enable logging to fluents, specify a valid fluentd tag name to be used for the + # log records # Module and class name of factory which will be used to provide classes dynamically for other components. factory_module: 'nfvbench.factory' diff --git a/nfvbench/fluentd.py b/nfvbench/fluentd.py index 16ff33e..628b968 100644 --- a/nfvbench/fluentd.py +++ b/nfvbench/fluentd.py @@ -28,24 +28,34 @@ class FluentLogHandler(logging.Handler): - the level name - the runlogdate (to tie multiple run-related logs together) The timestamp is retrieved by the fluentd library. + There will be only one instance of FluentLogHandler running. ''' - def __init__(self, tag, fluentd_ip='127.0.0.1', fluentd_port=24224): + def __init__(self, fluentd_configs): logging.Handler.__init__(self) - self.tag = tag - self.formatter = logging.Formatter('%(message)s') - self.sender = sender.FluentSender(self.tag, host=fluentd_ip, port=fluentd_port) + self.log_senders = [] + self.result_senders = [] self.runlogdate = 0 + self.formatter = logging.Formatter('%(message)s') + for fluentd_config in fluentd_configs: + if fluentd_config.logging_tag: + self.log_senders.append( + sender.FluentSender(fluentd_config.logging_tag, host=fluentd_config.ip, + port=fluentd_config.port)) + if fluentd_config.result_tag: + self.result_senders.append( + sender.FluentSender(fluentd_config.result_tag, host=fluentd_config.ip, + port=fluentd_config.port)) self.__warning_counter = 0 self.__error_counter = 0 def start_new_run(self): '''Delimitate a new run in the stream of records with a new timestamp ''' - self.runlogdate = self.__get_timestamp() # reset counters self.__warning_counter = 0 self.__error_counter = 0 + self.runlogdate = self.__get_timestamp() # send start record self.__send_start_record() @@ -60,13 +70,15 @@ class FluentLogHandler(logging.Handler): data["runlogdate"] = self.runlogdate self.__update_stats(record.levelno) - self.sender.emit(None, data) + for log_sender in self.log_senders: + log_sender.emit(None, data) - # this function is called by summarizer + # this function is called by summarizer, and used for sending results def record_send(self, record): - self.sender.emit(None, record) + for result_sender in self.result_senders: + result_sender.emit(None, record) - # send START record for each run + # send START log record for each run def __send_start_record(self): data = { "runlogdate": self.runlogdate, @@ -77,7 +89,8 @@ class FluentLogHandler(logging.Handler): "numwarnings": 0, "@timestamp": self.__get_timestamp() } - self.sender.emit(None, data) + for log_sender in self.log_senders: + log_sender.emit(None, data) # send stats related to the current run and reset state for a new run def send_run_summary(self, run_summary_required): @@ -94,7 +107,8 @@ class FluentLogHandler(logging.Handler): # so don't send runlogdate if self.runlogdate != 0: data["runlogdate"] = self.runlogdate - self.sender.emit(None, data) + for log_sender in self.log_senders: + log_sender.emit(None, data) def __get_highest_level(self): if self.__error_counter > 0: diff --git a/nfvbench/nfvbench.py b/nfvbench/nfvbench.py index bbee4f4..d1bd0d9 100644 --- a/nfvbench/nfvbench.py +++ b/nfvbench/nfvbench.py @@ -131,13 +131,7 @@ class NFVBench(object): def prepare_summary(self, result): """Prepares summary of the result to print and send it to logger (eg: fluentd)""" global fluent_logger - sender = None - if self.config.fluentd.result_tag: - sender = FluentLogHandler(self.config.fluentd.result_tag, - fluentd_ip=self.config.fluentd.ip, - fluentd_port=self.config.fluentd.port) - sender.runlogdate = fluent_logger.runlogdate - summary = NFVBenchSummarizer(result, sender) + summary = NFVBenchSummarizer(result, fluent_logger) LOG.info(str(summary)) def save(self, result): @@ -446,18 +440,17 @@ def main(): config = config_plugin.get_config() openstack_spec = config_plugin.get_openstack_spec() - # setup the fluent logger as soon as possible right after the config plugin is called - if config.fluentd.logging_tag: - fluent_logger = FluentLogHandler(config.fluentd.logging_tag, - fluentd_ip=config.fluentd.ip, - fluentd_port=config.fluentd.port) - LOG.addHandler(fluent_logger) - else: - fluent_logger = None - opts, unknown_opts = parse_opts_from_cli() log.set_level(debug=opts.debug) + # setup the fluent logger as soon as possible right after the config plugin is called, + # if there is any logging or result tag is set then initialize the fluent logger + for fluentd in config.fluentd: + if fluentd.logging_tag or fluentd.result_tag: + fluent_logger = FluentLogHandler(config.fluentd) + LOG.addHandler(fluent_logger) + break + if opts.version: print pbr.version.VersionInfo('nfvbench').version_string_with_vcs() sys.exit(0) @@ -467,14 +460,7 @@ def main(): result = json.load(json_data) if opts.user_label: result['config']['user_label'] = opts.user_label - if config.fluentd.result_tag: - sender = FluentLogHandler(config.fluentd.result_tag, - fluentd_ip=config.fluentd.ip, - fluentd_port=config.fluentd.port) - sender.runlogdate = fluent_logger.runlogdate - print NFVBenchSummarizer(result, sender) - else: - print NFVBenchSummarizer(result, None) + print NFVBenchSummarizer(result, fluent_logger) sys.exit(0) # show default config in text/yaml format @@ -531,8 +517,7 @@ def main(): if opts.server: if os.path.isdir(opts.server): - server = WebSocketIoServer(opts.server, nfvbench_instance, fluent_logger, - config.fluentd.result_tag) + server = WebSocketIoServer(opts.server, nfvbench_instance, fluent_logger) nfvbench_instance.set_notifier(server) try: port = int(opts.port) diff --git a/nfvbench/nfvbenchd.py b/nfvbench/nfvbenchd.py index 1797496..4772700 100644 --- a/nfvbench/nfvbenchd.py +++ b/nfvbench/nfvbenchd.py @@ -26,7 +26,6 @@ from flask import request from flask_socketio import emit from flask_socketio import SocketIO -from fluentd import FluentLogHandler from summarizer import NFVBenchSummarizer from log import LOG @@ -210,17 +209,10 @@ class WebSocketIoServer(object): of this class and pass a runner object then invoke the run method """ - def __init__(self, http_root, runner, logger, result_tag): + def __init__(self, http_root, runner, fluent_logger): self.nfvbench_runner = runner setup_flask(http_root) - self.fluent_logger = logger - self.result_fluent_logger = None - if result_tag: - self.result_fluent_logger = \ - FluentLogHandler(result_tag, - fluentd_ip=self.fluent_logger.sender.host, - fluentd_port=self.fluent_logger.sender.port) - self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate + self.fluent_logger = fluent_logger def run(self, host='127.0.0.1', port=7556): @@ -240,6 +232,8 @@ class WebSocketIoServer(object): # remove unfilled values as we do not want them to override default values with None config = {k: v for k, v in config.items() if v is not None} with RunLock(): + if self.fluent_logger: + self.fluent_logger.start_new_run() results = self.nfvbench_runner.run(config, config) except Exception as exc: print 'NFVbench runner exception:' @@ -252,9 +246,7 @@ class WebSocketIoServer(object): else: # this might overwrite a previously unfetched result Ctx.set_result(results) - if self.fluent_logger: - self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate - summary = NFVBenchSummarizer(results['result'], self.result_fluent_logger) + summary = NFVBenchSummarizer(results['result'], self.fluent_logger) LOG.info(str(summary)) Ctx.release() if self.fluent_logger: diff --git a/test/test_nfvbench.py b/test/test_nfvbench.py index 85342bb..2578407 100644 --- a/test/test_nfvbench.py +++ b/test/test_nfvbench.py @@ -786,11 +786,44 @@ def test_config(): def test_fluentd(): logger = logging.getLogger('fluent-logger') - handler = FluentLogHandler('nfvbench', fluentd_port=7081) + + class FluentdConfig(dict): + def __getattr__(self, attr): + return self.get(attr) + + fluentd_configs = [ + FluentdConfig({ + 'logging_tag': 'nfvbench', + 'result_tag': 'resultnfvbench', + 'ip': '127.0.0.1', + 'port': 7081 + }), + FluentdConfig({ + 'logging_tag': 'nfvbench', + 'result_tag': 'resultnfvbench', + 'ip': '127.0.0.1', + 'port': 24224 + }), + FluentdConfig({ + 'logging_tag': None, + 'result_tag': 'resultnfvbench', + 'ip': '127.0.0.1', + 'port': 7082 + }), + FluentdConfig({ + 'logging_tag': 'nfvbench', + 'result_tag': None, + 'ip': '127.0.0.1', + 'port': 7083 + }) + ] + + handler = FluentLogHandler(fluentd_configs=fluentd_configs) logger.addHandler(handler) logger.setLevel(logging.INFO) logger.info('test') logger.warning('test %d', 100) + try: raise Exception("test") except Exception: -- cgit 1.2.3-korg