From f6a699e63dae2bb5779bd757dc62217193139ad9 Mon Sep 17 00:00:00 2001 From: Kerim Gokarslan Date: Wed, 13 Sep 2017 20:26:01 -0700 Subject: NFVBENCH-25 Send run results to fluentd Change-Id: I671a9297b90784bc30eee48ea9244a9c63a24e85 Signed-off-by: Kerim Gokarslan --- nfvbench/fluentd.py | 4 ++ nfvbench/nfvbench.py | 14 ++-- nfvbench/nfvbenchd.py | 12 +++- nfvbench/summarizer.py | 120 +++++++++++++++++++++++++++++++--- nfvbench/traffic_client.py | 6 +- nfvbench/traffic_gen/traffic_utils.py | 4 +- nfvbench/traffic_gen/trex.py | 33 +++++----- 7 files changed, 157 insertions(+), 36 deletions(-) diff --git a/nfvbench/fluentd.py b/nfvbench/fluentd.py index 93c90fa..a9bda62 100644 --- a/nfvbench/fluentd.py +++ b/nfvbench/fluentd.py @@ -61,6 +61,10 @@ class FluentLogHandler(logging.Handler): self.__update_stats(record.levelno) self.sender.emit(None, data) + # this function is called by summarizer + def record_send(self, record): + self.sender.emit(None, record) + # send START record for each run def __send_start_record(self): data = { diff --git a/nfvbench/nfvbench.py b/nfvbench/nfvbench.py index 67b953f..920838a 100644 --- a/nfvbench/nfvbench.py +++ b/nfvbench/nfvbench.py @@ -127,9 +127,13 @@ class NFVBench(object): 'error_message': message } - def print_summary(self, result): - """Print summary of the result""" - summary = NFVBenchSummarizer(result) + def prepare_summary(self, result): + """Prepares summary of the result to print and send it to logger (eg: fluentd)""" + sender = FluentLogHandler("resultnfvbench", + fluentd_ip=self.config.fluentd.ip, + fluentd_port=self.config.fluentd.port) \ + if self.config.fluentd.logging_tag else None + summary = NFVBenchSummarizer(result, sender) LOG.info(str(summary)) def save(self, result): @@ -453,7 +457,7 @@ def main(): if opts.summary: with open(opts.summary) as json_data: - print NFVBenchSummarizer(json.load(json_data)) + print NFVBenchSummarizer(json.load(json_data), None) sys.exit(0) # show default config in text/yaml format @@ -539,7 +543,7 @@ def main(): if 'result' in result and result['status']: nfvbench.save(result['result']) - nfvbench.print_summary(result['result']) + nfvbench.prepare_summary(result['result']) except Exception as exc: run_summary_required = True LOG.error({ diff --git a/nfvbench/nfvbenchd.py b/nfvbench/nfvbenchd.py index 3534950..3ab30de 100644 --- a/nfvbench/nfvbenchd.py +++ b/nfvbench/nfvbenchd.py @@ -21,6 +21,8 @@ from flask import request from flask_socketio import emit from flask_socketio import SocketIO +from fluentd import FluentLogHandler +from summarizer import NFVBenchSummarizer import json from log import LOG @@ -211,10 +213,15 @@ class WebSocketIoServer(object): notifications using websocket events (send_ methods). Caller should simply create an instance of this class and pass a runner object then invoke the run method """ + def __init__(self, http_root, runner, logger): self.nfvbench_runner = runner setup_flask(http_root) self.fluent_logger = logger + self.result_fluent_logger = FluentLogHandler("resultnfvbench", + fluentd_ip=self.fluent_logger.sender.host, + fluentd_port=self.fluent_logger.sender.port) \ + if self.fluent_logger else None def run(self, host='127.0.0.1', port=7556): @@ -246,8 +253,11 @@ class WebSocketIoServer(object): else: # this might overwrite a previously unfetched result Ctx.set_result(results) + summary = NFVBenchSummarizer(results['result'], self.result_fluent_logger) + LOG.info(str(summary)) Ctx.release() - self.fluent_logger.send_run_summary(True) + if self.fluent_logger: + self.fluent_logger.send_run_summary(True) def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct): stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct} diff --git a/nfvbench/summarizer.py b/nfvbench/summarizer.py index 4ee2426..1eaa8d6 100644 --- a/nfvbench/summarizer.py +++ b/nfvbench/summarizer.py @@ -16,7 +16,9 @@ import bitmath from contextlib import contextmanager +from datetime import datetime import math +import pytz from specs import ChainType from tabulate import tabulate @@ -90,7 +92,7 @@ class Table(object): self.columns = len(header_row) def add_row(self, row): - assert(self.columns == len(row)) + assert (self.columns == len(row)) formatted_row = [] for entry, formatter in zip(row, self.formatters): formatted_row.append(formatter(entry)) @@ -121,7 +123,7 @@ class Summarizer(object): self.marker_stack.append(marker) def __unindent(self): - assert(self.indent_size >= self.indent_per_level) + assert (self.indent_size >= self.indent_per_level) self.indent_size -= self.indent_per_level self.marker_stack.pop() @@ -208,10 +210,16 @@ class NFVBenchSummarizer(Summarizer): direction_keys = ['direction-forward', 'direction-reverse', 'direction-total'] direction_names = ['Forward', 'Reverse', 'Total'] - def __init__(self, result): + def __init__(self, result, sender): Summarizer.__init__(self) self.result = result self.config = self.result['config'] + self.record_header = None + self.record_data = None + self.sender = sender + # if sender is available initialize record + if self.sender: + self.__record_init() self.__summarize() def __summarize(self): @@ -223,6 +231,9 @@ class NFVBenchSummarizer(Summarizer): 'vSwitch': self.result['openstack_spec']['vswitch'], 'Encapsulation': self.result['openstack_spec']['encaps'] }) + self.__record_header_put('version', self.result['nfvbench_version']) + self.__record_header_put('vSwitch', self.result['openstack_spec']['vswitch']) + self.__record_header_put('Encapsulation', self.result['openstack_spec']['encaps']) self._put('Benchmarks:') with self._create_block(): self._put('Networks:') @@ -251,7 +262,6 @@ class NFVBenchSummarizer(Summarizer): self._put('NDR:', self.config['measurement']['NDR']) if self.config['pdr_run']: self._put('PDR:', self.config['measurement']['PDR']) - self._put('Service chain:') for result in network_benchmark['service_chain'].iteritems(): with self._create_block(): @@ -261,6 +271,8 @@ class NFVBenchSummarizer(Summarizer): self._put(chain_name + ':') if chain_name == ChainType.PVVP: self._put('Mode:', chain_benchmark.get('mode')) + chain_name += "-" + chain_benchmark.get('mode') + self.__record_header_put('service_chain', chain_name) with self._create_block(): self._put('Traffic:') with self._create_block(False): @@ -272,6 +284,12 @@ class NFVBenchSummarizer(Summarizer): self._put('Flow count:', traffic_benchmark['flow_count']) self._put('Service chains count:', traffic_benchmark['service_chain_count']) self._put('Compute nodes:', traffic_benchmark['compute_nodes'].keys()) + + self.__record_header_put('profile', traffic_benchmark['profile']) + self.__record_header_put('bidirectional', traffic_benchmark['bidirectional']) + self.__record_header_put('flow_count', traffic_benchmark['flow_count']) + self.__record_header_put('sc_count', traffic_benchmark['service_chain_count']) + self.__record_header_put('compute_nodes', traffic_benchmark['compute_nodes'].keys()) with self._create_block(False): self._put() if not self.config['no_traffic']: @@ -289,6 +307,7 @@ class NFVBenchSummarizer(Summarizer): if 'warning' in entry: continue self.__chain_analysis_summarize(*entry) + self.__record_send() def __chain_analysis_summarize(self, frame_size, analysis): self._put() @@ -296,19 +315,25 @@ class NFVBenchSummarizer(Summarizer): if 'analysis_duration_sec' in analysis: self._put('Chain analysis duration:', Formatter.float(3)(analysis['analysis_duration_sec']), 'seconds') + self.__record_data_put(frame_size, {'chain_analysis_duration': Formatter.float(3)( + analysis['analysis_duration_sec'])}) if self.config['ndr_run']: self._put('NDR search duration:', Formatter.float(0)(analysis['ndr']['time_taken_sec']), 'seconds') + self.__record_data_put(frame_size, {'ndr_search_duration': Formatter.float(0)( + analysis['ndr']['time_taken_sec'])}) if self.config['pdr_run']: self._put('PDR search duration:', Formatter.float(0)(analysis['pdr']['time_taken_sec']), 'seconds') + self.__record_data_put(frame_size, {'pdr_search_duration': Formatter.float(0)( + analysis['pdr']['time_taken_sec'])}) self._put() if not self.config['no_traffic'] and self.config['single_run']: self._put('Run Config:') self._put() with self._create_block(False): - self._put_table(self.__get_config_table(analysis['run_config'])) + self._put_table(self.__get_config_table(analysis['run_config'], frame_size)) if 'warning' in analysis['run_config'] and analysis['run_config']['warning']: self._put() self._put(analysis['run_config']['warning']) @@ -335,12 +360,21 @@ class NFVBenchSummarizer(Summarizer): 'NDR', frame_size, analysis['ndr']['rate_bps'], - int(analysis['ndr']['rate_pps']), + analysis['ndr']['rate_pps'], analysis['ndr']['stats']['overall']['drop_percentage'], analysis['ndr']['stats']['overall']['avg_delay_usec'], analysis['ndr']['stats']['overall']['min_delay_usec'], analysis['ndr']['stats']['overall']['max_delay_usec'] ]) + self.__record_data_put(frame_size, {'ndr': { + 'type': 'NDR', + 'rate_bps': analysis['ndr']['rate_bps'], + 'rate_pps': analysis['ndr']['rate_pps'], + 'drop_percantage': analysis['ndr']['stats']['overall']['drop_percentage'], + 'avg_delay_usec': analysis['ndr']['stats']['overall']['avg_delay_usec'], + 'min_delay_usec': analysis['ndr']['stats']['overall']['min_delay_usec'], + 'max_delay_usec': analysis['ndr']['stats']['overall']['max_delay_usec'] + }}) if self.config['pdr_run']: for frame_size, analysis in traffic_result.iteritems(): if frame_size == 'warning': @@ -349,12 +383,21 @@ class NFVBenchSummarizer(Summarizer): 'PDR', frame_size, analysis['pdr']['rate_bps'], - int(analysis['pdr']['rate_pps']), + analysis['pdr']['rate_pps'], analysis['pdr']['stats']['overall']['drop_percentage'], analysis['pdr']['stats']['overall']['avg_delay_usec'], analysis['pdr']['stats']['overall']['min_delay_usec'], analysis['pdr']['stats']['overall']['max_delay_usec'] ]) + self.__record_data_put(frame_size, {'pdr': { + 'type': 'PDR', + 'rate_bps': analysis['pdr']['rate_bps'], + 'rate_pps': analysis['pdr']['rate_pps'], + 'drop_percantage': analysis['pdr']['stats']['overall']['drop_percentage'], + 'avg_delay_usec': analysis['pdr']['stats']['overall']['avg_delay_usec'], + 'min_delay_usec': analysis['pdr']['stats']['overall']['min_delay_usec'], + 'max_delay_usec': analysis['pdr']['stats']['overall']['max_delay_usec'] + }}) if self.config['single_run']: for frame_size, analysis in traffic_result.iteritems(): summary_table.add_row([ @@ -364,9 +407,16 @@ class NFVBenchSummarizer(Summarizer): analysis['stats']['overall']['rx']['min_delay_usec'], analysis['stats']['overall']['rx']['max_delay_usec'] ]) + self.__record_data_put(frame_size, {'single_run': { + 'type': 'single_run', + 'drop_rate_percent': analysis['stats']['overall']['drop_rate_percent'], + 'avg_delay_usec': analysis['stats']['overall']['rx']['avg_delay_usec'], + 'min_delay_usec': analysis['stats']['overall']['rx']['min_delay_usec'], + 'max_delay_usec': analysis['stats']['overall']['rx']['max_delay_usec'] + }}) return summary_table - def __get_config_table(self, run_config): + def __get_config_table(self, run_config, frame_size): config_table = Table(self.config_header) for key, name in zip(self.direction_keys, self.direction_names): if key not in run_config: @@ -380,6 +430,15 @@ class NFVBenchSummarizer(Summarizer): int(run_config[key]['tx']['rate_pps']), int(run_config[key]['rx']['rate_pps']), ]) + self.__record_data_put(frame_size, { + name.lower() + "_orig_rate_bps": int(run_config[key]['orig']['rate_bps']), + name.lower() + "_tx_rate_bps": int(run_config[key]['tx']['rate_bps']), + name.lower() + "_rx_rate_bps": int(run_config[key]['rx']['rate_bps']), + name.lower() + "_orig_rate_pps": int(run_config[key]['orig']['rate_pps']), + name.lower() + "_tx_rate_pps": int(run_config[key]['tx']['rate_pps']), + name.lower() + "_rx_rate_pps": int(run_config[key]['rx']['rate_pps']), + + }) return config_table def __get_chain_analysis_table(self, packet_analysis): @@ -387,7 +446,6 @@ class NFVBenchSummarizer(Summarizer): forward_analysis = packet_analysis['direction-forward'] reverse_analysis = packet_analysis['direction-reverse'] reverse_analysis.reverse() - for fwd, rev in zip(forward_analysis, reverse_analysis): chain_analysis_table.add_row([ fwd['interface'], @@ -400,3 +458,47 @@ class NFVBenchSummarizer(Summarizer): rev.get('packet_drop_percentage', None), ]) return chain_analysis_table + + def __record_header_put(self, key, value): + if self.sender: + self.record_header[key] = value + + def __record_data_put(self, key, data): + if self.sender: + if key not in self.record_data: + self.record_data[key] = {} + self.record_data[key].update(data) + + def __record_send(self): + if self.sender: + self.record_header["@timestamp"] = datetime.utcnow().replace( + tzinfo=pytz.utc).strftime( + "%Y-%m-%dT%H:%M:%S.%f%z") + for frame_size in self.record_data: + data = self.record_header + data['frame_size'] = frame_size + data.update(self.record_data[frame_size]) + run_specific_data = {} + if 'single_run' in data: + run_specific_data['single_run'] = data['single_run'] + del data['single_run'] + if 'ndr' in data: + run_specific_data['ndr'] = data['ndr'] + run_specific_data['ndr']['drop_limit'] = self.config['measurement']['NDR'] + del data['ndr'] + if 'pdr' in data: + run_specific_data['pdr'] = data['pdr'] + run_specific_data['pdr']['drop_limit'] = self.config['measurement']['PDR'] + del data['pdr'] + for key in run_specific_data: + data_to_send = data.copy() + data_to_send.update(run_specific_data[key]) + self.sender.record_send(data_to_send) + self.__record_init() + + def __record_init(self): + # init is called after checking for sender + self.record_header = { + "runlogdate": self.sender.runlogdate, + } + self.record_data = {} diff --git a/nfvbench/traffic_client.py b/nfvbench/traffic_client.py index 7c8367a..27ff227 100644 --- a/nfvbench/traffic_client.py +++ b/nfvbench/traffic_client.py @@ -537,9 +537,9 @@ class TrafficClient(object): retDict[port]['rx'][key] = int(stats[port]['rx'][key]) except ValueError: retDict[port]['rx'][key] = 0 - retDict[port]['rx']['avg_delay_usec'] = float(stats[port]['rx']['avg_delay_usec']) - retDict[port]['rx']['min_delay_usec'] = float(stats[port]['rx']['min_delay_usec']) - retDict[port]['rx']['max_delay_usec'] = float(stats[port]['rx']['max_delay_usec']) + retDict[port]['rx']['avg_delay_usec'] = int(stats[port]['rx']['avg_delay_usec']) + retDict[port]['rx']['min_delay_usec'] = int(stats[port]['rx']['min_delay_usec']) + retDict[port]['rx']['max_delay_usec'] = int(stats[port]['rx']['max_delay_usec']) retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port]) ports = sorted(retDict.keys()) diff --git a/nfvbench/traffic_gen/traffic_utils.py b/nfvbench/traffic_gen/traffic_utils.py index e5dc463..d3def42 100644 --- a/nfvbench/traffic_gen/traffic_utils.py +++ b/nfvbench/traffic_gen/traffic_utils.py @@ -39,9 +39,9 @@ def convert_rates(l2frame_size, rate, intf_speed): return { 'initial_rate_type': initial_rate_type, - 'rate_pps': pps, + 'rate_pps': int(pps), 'rate_percent': load, - 'rate_bps': bps + 'rate_bps': int(bps) } diff --git a/nfvbench/traffic_gen/trex.py b/nfvbench/traffic_gen/trex.py index 8aca290..a9effab 100644 --- a/nfvbench/traffic_gen/trex.py +++ b/nfvbench/traffic_gen/trex.py @@ -27,7 +27,6 @@ from traffic_base import AbstractTrafficGenerator from traffic_base import TrafficGeneratorException import traffic_utils as utils - from trex_stl_lib.api import CTRexVmInsFixHwCs from trex_stl_lib.api import Dot1Q from trex_stl_lib.api import Ether @@ -49,7 +48,6 @@ from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP class TRex(AbstractTrafficGenerator): - LATENCY_PPS = 1000 def __init__(self, runner): @@ -75,27 +73,30 @@ class TRex(AbstractTrafficGenerator): stats = self.__combine_stats(in_stats, ph) result[ph] = { 'tx': { - 'total_pkts': stats['tx_pkts']['total'], - 'total_pkt_bytes': stats['tx_bytes']['total'], - 'pkt_rate': stats['tx_pps']['total'], - 'pkt_bit_rate': stats['tx_bps']['total'] + 'total_pkts': int(stats['tx_pkts']['total']), + 'total_pkt_bytes': int(stats['tx_bytes']['total']), + 'pkt_rate': int(stats['tx_pps']['total']), + 'pkt_bit_rate': int(stats['tx_bps']['total']) }, 'rx': { - 'total_pkts': stats['rx_pkts']['total'], - 'total_pkt_bytes': stats['rx_bytes']['total'], - 'pkt_rate': stats['rx_pps']['total'], - 'pkt_bit_rate': stats['rx_bps']['total'], - 'dropped_pkts': stats['tx_pkts']['total'] - stats['rx_pkts']['total'] + 'total_pkts': int(stats['rx_pkts']['total']), + 'total_pkt_bytes': int(stats['rx_bytes']['total']), + 'pkt_rate': int(stats['rx_pps']['total']), + 'pkt_bit_rate': int(stats['rx_bps']['total']), + 'dropped_pkts': int(stats['tx_pkts']['total'] - stats['rx_pkts']['total']) } } lat = self.__combine_latencies(in_stats, ph) - result[ph]['rx']['max_delay_usec'] = lat.get('total_max', float('nan')) - result[ph]['rx']['min_delay_usec'] = lat.get('total_min', float('nan')) - result[ph]['rx']['avg_delay_usec'] = lat.get('average', float('nan')) - + result[ph]['rx']['max_delay_usec'] = int( + lat['total_max']) if 'total_max' in lat else float('nan') + result[ph]['rx']['min_delay_usec'] = int( + lat['total_min']) if 'total_min' in lat else float('nan') + result[ph]['rx']['avg_delay_usec'] = int( + lat['average']) if 'average' in lat else float( + 'nan') total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts'] - result["total_tx_rate"] = total_tx_pkts / self.config.duration_sec + result["total_tx_rate"] = int(total_tx_pkts / self.config.duration_sec) return result def __combine_stats(self, in_stats, port_handle): -- cgit 1.2.3-korg