summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKerim Gokarslan <kgokarsl@cisco.com>2017-10-17 15:17:44 -0700
committerahothan <ahothan@cisco.com>2017-10-20 00:37:14 -0700
commit152d3f49f8208adc1d0b37d41356163ca65fe0ca (patch)
tree1db7e5bd957ce94ca068f6c3681588aaeb545a11
parent393e05913cbe555457d7e63266240b3bf44d9a98 (diff)
NFVBENCH-42 Add multiple fluentd aggregators support
Change-Id: I5b752f9ad4f7b4a60f2678d22467db570e02ab82 Signed-off-by: Kerim Gokarslan <kgokarsl@cisco.com>
-rw-r--r--docs/testing/user/userguide/fluentd.rst2
-rw-r--r--nfvbench/cfg.default.yaml36
-rw-r--r--nfvbench/fluentd.py36
-rw-r--r--nfvbench/nfvbench.py37
-rw-r--r--nfvbench/nfvbenchd.py18
-rw-r--r--test/test_nfvbench.py35
6 files changed, 98 insertions, 66 deletions
diff --git a/docs/testing/user/userguide/fluentd.rst b/docs/testing/user/userguide/fluentd.rst
index 465d0a3..889d598 100644
--- a/docs/testing/user/userguide/fluentd.rst
+++ b/docs/testing/user/userguide/fluentd.rst
@@ -187,4 +187,4 @@ And the results of this command obtained from fluentd by elasticsearch:
"sort": [
1508264203755
]
- } \ No newline at end of file
+ }
diff --git a/nfvbench/cfg.default.yaml b/nfvbench/cfg.default.yaml
index c26991c..2892d11 100644
--- a/nfvbench/cfg.default.yaml
+++ b/nfvbench/cfg.default.yaml
@@ -367,24 +367,32 @@ debug: false
# Defaults to disabled
log_file:
-# When enabled, all logs will be sent to a fluentd server at the requested IP and port
-# The fluentd "tag" and "label" fields for every message will be set to "nfvbench"
+# When enabled, all results and/or logs will be sent to a fluentd servers at the requested IPs and ports
+# A list of one or more fluentd servers identified by their IPs and port numbers should be given.
+# For each recipient it is possible to enable both sending logs and performance
+# results, or enable either logs or performance results. For enabling logs or results logging_tag or
+# result_tag should be set.
+
fluentd:
- # by default (logging_tag is empty) nfvbench log messages are not sent to fluentd
- # to enable logging to fluents, specify a valid fluentd tag name to be used for the
- # log records
- logging_tag:
+ # by default (logging_tag is empty) nfvbench log messages are not sent to fluentd
+ # to enable logging to fluents, specify a valid fluentd tag name to be used for the
+ # log records
+ - logging_tag:
+
+ # by default (result_tag is empty) nfvbench results are not sent to fluentd
+ # to enable sending nfvbench results to fluentd, specify a valid fluentd tag name
+ # to be used for the results records, which is different than logging_tag
+ result_tag:
- # by default (result_tag is empty) nfvbench results are not sent to fluentd
- # to enable sending nfvbench results to fluentd, specify a valid fluentd tag name
- # to be used for the results records, which is different than logging_tag
- result_tag:
+ # IP address of the server, defaults to loopback
+ ip: 127.0.0.1
- # IP address of the server, defaults to loopback
- ip: 127.0.0.1
+ # port # to use, by default, use the default fluentd forward port
+ port: 24224
- # port # to use, by default, use the default fluentd forward port
- port: 24224
+ # by default (logging_tag is empty) nfvbench log messages are not sent to fluentd
+ # to enable logging to fluents, specify a valid fluentd tag name to be used for the
+ # log records
# Module and class name of factory which will be used to provide classes dynamically for other components.
factory_module: 'nfvbench.factory'
diff --git a/nfvbench/fluentd.py b/nfvbench/fluentd.py
index 16ff33e..628b968 100644
--- a/nfvbench/fluentd.py
+++ b/nfvbench/fluentd.py
@@ -28,24 +28,34 @@ class FluentLogHandler(logging.Handler):
- the level name
- the runlogdate (to tie multiple run-related logs together)
The timestamp is retrieved by the fluentd library.
+ There will be only one instance of FluentLogHandler running.
'''
- def __init__(self, tag, fluentd_ip='127.0.0.1', fluentd_port=24224):
+ def __init__(self, fluentd_configs):
logging.Handler.__init__(self)
- self.tag = tag
- self.formatter = logging.Formatter('%(message)s')
- self.sender = sender.FluentSender(self.tag, host=fluentd_ip, port=fluentd_port)
+ self.log_senders = []
+ self.result_senders = []
self.runlogdate = 0
+ self.formatter = logging.Formatter('%(message)s')
+ for fluentd_config in fluentd_configs:
+ if fluentd_config.logging_tag:
+ self.log_senders.append(
+ sender.FluentSender(fluentd_config.logging_tag, host=fluentd_config.ip,
+ port=fluentd_config.port))
+ if fluentd_config.result_tag:
+ self.result_senders.append(
+ sender.FluentSender(fluentd_config.result_tag, host=fluentd_config.ip,
+ port=fluentd_config.port))
self.__warning_counter = 0
self.__error_counter = 0
def start_new_run(self):
'''Delimitate a new run in the stream of records with a new timestamp
'''
- self.runlogdate = self.__get_timestamp()
# reset counters
self.__warning_counter = 0
self.__error_counter = 0
+ self.runlogdate = self.__get_timestamp()
# send start record
self.__send_start_record()
@@ -60,13 +70,15 @@ class FluentLogHandler(logging.Handler):
data["runlogdate"] = self.runlogdate
self.__update_stats(record.levelno)
- self.sender.emit(None, data)
+ for log_sender in self.log_senders:
+ log_sender.emit(None, data)
- # this function is called by summarizer
+ # this function is called by summarizer, and used for sending results
def record_send(self, record):
- self.sender.emit(None, record)
+ for result_sender in self.result_senders:
+ result_sender.emit(None, record)
- # send START record for each run
+ # send START log record for each run
def __send_start_record(self):
data = {
"runlogdate": self.runlogdate,
@@ -77,7 +89,8 @@ class FluentLogHandler(logging.Handler):
"numwarnings": 0,
"@timestamp": self.__get_timestamp()
}
- self.sender.emit(None, data)
+ for log_sender in self.log_senders:
+ log_sender.emit(None, data)
# send stats related to the current run and reset state for a new run
def send_run_summary(self, run_summary_required):
@@ -94,7 +107,8 @@ class FluentLogHandler(logging.Handler):
# so don't send runlogdate
if self.runlogdate != 0:
data["runlogdate"] = self.runlogdate
- self.sender.emit(None, data)
+ for log_sender in self.log_senders:
+ log_sender.emit(None, data)
def __get_highest_level(self):
if self.__error_counter > 0:
diff --git a/nfvbench/nfvbench.py b/nfvbench/nfvbench.py
index bbee4f4..d1bd0d9 100644
--- a/nfvbench/nfvbench.py
+++ b/nfvbench/nfvbench.py
@@ -131,13 +131,7 @@ class NFVBench(object):
def prepare_summary(self, result):
"""Prepares summary of the result to print and send it to logger (eg: fluentd)"""
global fluent_logger
- sender = None
- if self.config.fluentd.result_tag:
- sender = FluentLogHandler(self.config.fluentd.result_tag,
- fluentd_ip=self.config.fluentd.ip,
- fluentd_port=self.config.fluentd.port)
- sender.runlogdate = fluent_logger.runlogdate
- summary = NFVBenchSummarizer(result, sender)
+ summary = NFVBenchSummarizer(result, fluent_logger)
LOG.info(str(summary))
def save(self, result):
@@ -446,18 +440,17 @@ def main():
config = config_plugin.get_config()
openstack_spec = config_plugin.get_openstack_spec()
- # setup the fluent logger as soon as possible right after the config plugin is called
- if config.fluentd.logging_tag:
- fluent_logger = FluentLogHandler(config.fluentd.logging_tag,
- fluentd_ip=config.fluentd.ip,
- fluentd_port=config.fluentd.port)
- LOG.addHandler(fluent_logger)
- else:
- fluent_logger = None
-
opts, unknown_opts = parse_opts_from_cli()
log.set_level(debug=opts.debug)
+ # setup the fluent logger as soon as possible right after the config plugin is called,
+ # if there is any logging or result tag is set then initialize the fluent logger
+ for fluentd in config.fluentd:
+ if fluentd.logging_tag or fluentd.result_tag:
+ fluent_logger = FluentLogHandler(config.fluentd)
+ LOG.addHandler(fluent_logger)
+ break
+
if opts.version:
print pbr.version.VersionInfo('nfvbench').version_string_with_vcs()
sys.exit(0)
@@ -467,14 +460,7 @@ def main():
result = json.load(json_data)
if opts.user_label:
result['config']['user_label'] = opts.user_label
- if config.fluentd.result_tag:
- sender = FluentLogHandler(config.fluentd.result_tag,
- fluentd_ip=config.fluentd.ip,
- fluentd_port=config.fluentd.port)
- sender.runlogdate = fluent_logger.runlogdate
- print NFVBenchSummarizer(result, sender)
- else:
- print NFVBenchSummarizer(result, None)
+ print NFVBenchSummarizer(result, fluent_logger)
sys.exit(0)
# show default config in text/yaml format
@@ -531,8 +517,7 @@ def main():
if opts.server:
if os.path.isdir(opts.server):
- server = WebSocketIoServer(opts.server, nfvbench_instance, fluent_logger,
- config.fluentd.result_tag)
+ server = WebSocketIoServer(opts.server, nfvbench_instance, fluent_logger)
nfvbench_instance.set_notifier(server)
try:
port = int(opts.port)
diff --git a/nfvbench/nfvbenchd.py b/nfvbench/nfvbenchd.py
index 1797496..4772700 100644
--- a/nfvbench/nfvbenchd.py
+++ b/nfvbench/nfvbenchd.py
@@ -26,7 +26,6 @@ from flask import request
from flask_socketio import emit
from flask_socketio import SocketIO
-from fluentd import FluentLogHandler
from summarizer import NFVBenchSummarizer
from log import LOG
@@ -210,17 +209,10 @@ class WebSocketIoServer(object):
of this class and pass a runner object then invoke the run method
"""
- def __init__(self, http_root, runner, logger, result_tag):
+ def __init__(self, http_root, runner, fluent_logger):
self.nfvbench_runner = runner
setup_flask(http_root)
- self.fluent_logger = logger
- self.result_fluent_logger = None
- if result_tag:
- self.result_fluent_logger = \
- FluentLogHandler(result_tag,
- fluentd_ip=self.fluent_logger.sender.host,
- fluentd_port=self.fluent_logger.sender.port)
- self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate
+ self.fluent_logger = fluent_logger
def run(self, host='127.0.0.1', port=7556):
@@ -240,6 +232,8 @@ class WebSocketIoServer(object):
# remove unfilled values as we do not want them to override default values with None
config = {k: v for k, v in config.items() if v is not None}
with RunLock():
+ if self.fluent_logger:
+ self.fluent_logger.start_new_run()
results = self.nfvbench_runner.run(config, config)
except Exception as exc:
print 'NFVbench runner exception:'
@@ -252,9 +246,7 @@ class WebSocketIoServer(object):
else:
# this might overwrite a previously unfetched result
Ctx.set_result(results)
- if self.fluent_logger:
- self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate
- summary = NFVBenchSummarizer(results['result'], self.result_fluent_logger)
+ summary = NFVBenchSummarizer(results['result'], self.fluent_logger)
LOG.info(str(summary))
Ctx.release()
if self.fluent_logger:
diff --git a/test/test_nfvbench.py b/test/test_nfvbench.py
index 85342bb..2578407 100644
--- a/test/test_nfvbench.py
+++ b/test/test_nfvbench.py
@@ -786,11 +786,44 @@ def test_config():
def test_fluentd():
logger = logging.getLogger('fluent-logger')
- handler = FluentLogHandler('nfvbench', fluentd_port=7081)
+
+ class FluentdConfig(dict):
+ def __getattr__(self, attr):
+ return self.get(attr)
+
+ fluentd_configs = [
+ FluentdConfig({
+ 'logging_tag': 'nfvbench',
+ 'result_tag': 'resultnfvbench',
+ 'ip': '127.0.0.1',
+ 'port': 7081
+ }),
+ FluentdConfig({
+ 'logging_tag': 'nfvbench',
+ 'result_tag': 'resultnfvbench',
+ 'ip': '127.0.0.1',
+ 'port': 24224
+ }),
+ FluentdConfig({
+ 'logging_tag': None,
+ 'result_tag': 'resultnfvbench',
+ 'ip': '127.0.0.1',
+ 'port': 7082
+ }),
+ FluentdConfig({
+ 'logging_tag': 'nfvbench',
+ 'result_tag': None,
+ 'ip': '127.0.0.1',
+ 'port': 7083
+ })
+ ]
+
+ handler = FluentLogHandler(fluentd_configs=fluentd_configs)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.info('test')
logger.warning('test %d', 100)
+
try:
raise Exception("test")
except Exception: