From ef3d8d2223899f70d93be3ffa5ff14e7ad2d421f Mon Sep 17 00:00:00 2001 From: Luc Provoost Date: Mon, 10 May 2021 16:04:22 +0200 Subject: Fix latency buckets with multiple lat cores The bucket data of the last latency core was taken into account when collecting the latency stats. We are now adding the bucket data for all cores. Also changed the error reporting when PROX instance is breaking the pipe connection (e.g. when the dataplane network is overloaded). Cleaning up some trailing spaces in pox_ctrl.py Change-Id: I09ba01ac65e7e4e9ff03ad47da83aa4f83250a67 Signed-off-by: Luc Provoost --- VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py | 47 ++++++---- VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py | 5 +- VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py | 112 ++++++++++++++--------- 3 files changed, 98 insertions(+), 66 deletions(-) diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py index 8cd2e3f2..40375c54 100644 --- a/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py +++ b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py @@ -25,7 +25,7 @@ import os import time import subprocess import socket -from rapid_log import RapidLog +from rapid_log import RapidLog class prox_ctrl(object): def __init__(self, ip, key=None, user=None): @@ -223,9 +223,10 @@ class prox_sock(object): result['lat_avg'] += int(stats[2]) #min_since begin = int(stats[3]) #max_since_begin = int(stats[4]) - result['lat_tsc'] = int(stats[5]) # Taking the last tsc as the timestamp since - # PROX will return the same tsc for each - # core/task combination + result['lat_tsc'] = int(stats[5]) + # Taking the last tsc as the timestamp since + # PROX will return the same tsc for each + # core/task combination result['lat_hz'] = int(stats[6]) #coreid = int(stats[7]) #taskid = int(stats[8]) @@ -241,7 +242,7 @@ class prox_sock(object): result['buckets'][0] = int(stats[1]) for i in range(1, 128): stats = self._recv().split(':') - result['buckets'][i] = int(stats[1]) + result['buckets'][i] += int(stats[1]) result['lat_avg'] = old_div(result['lat_avg'], number_tasks_returning_stats) self._send('stats latency(0).used') @@ -252,7 +253,7 @@ class prox_sock(object): return (result) def irq_stats(self, core, bucket, task=0): - self._send('stats task.core(%s).task(%s).irq(%s)' % + self._send('stats task.core(%s).task(%s).irq(%s)' % (core, task, bucket)) stats = self._recv().split(',') return int(stats[0]) @@ -266,12 +267,12 @@ class prox_sock(object): def core_stats(self, cores, tasks=[0]): rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0 - self._send('dp core stats %s %s' % (','.join(map(str, cores)), + self._send('dp core stats %s %s' % (','.join(map(str, cores)), ','.join(map(str, tasks)))) for core in cores: for task in tasks: stats = self._recv().split(',') - if stats[0].startswith('error'): + if stats[0].startswith('error'): if stats[0].startswith('error: invalid syntax'): RapidLog.critical("dp core stats error: unexpected \ invalid syntax (potential incompatibility \ @@ -292,7 +293,7 @@ class prox_sock(object): rx = tx = port_id = tsc = no_mbufs = errors = 0 self._send('multi port stats %s' % (','.join(map(str, ports)))) result = self._recv().split(';') - if result[0].startswith('error'): + if result[0].startswith('error'): RapidLog.critical("multi port stats error: unexpected invalid \ syntax (potential incompatibility between scripts and \ PROX)") @@ -308,19 +309,19 @@ class prox_sock(object): return rx, tx, no_mbufs, errors, tsc def set_random(self, cores, task, offset, mask, length): - self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)), + self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)), task, offset, mask, length)) def set_size(self, cores, task, pkt_size): - self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task, + self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task, pkt_size)) def set_imix(self, cores, task, imix): - self._send('imix %s %s %s' % (','.join(map(str, cores)), task, + self._send('imix %s %s %s' % (','.join(map(str, cores)), task, ','.join(map(str,imix)))) def set_value(self, cores, task, offset, value, length): - self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)), + self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)), task, offset, value, length)) def quit_prox(self): @@ -330,16 +331,24 @@ class prox_sock(object): """Append LF and send command to the PROX instance.""" if self._sock is None: raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd) - self._sock.sendall(cmd.encode() + b'\n') + try: + self._sock.sendall(cmd.encode() + b'\n') + except ConnectionResetError as e: + RapidLog.error('Pipe reset by Prox instance: traffic too high?') + raise def _recv(self): """Receive response from PROX instance, return it with LF removed.""" if self._sock is None: raise RuntimeError("PROX socket closed, cannot receive anymore") - pos = self._rcvd.find(b'\n') - while pos == -1: - self._rcvd += self._sock.recv(256) + try: pos = self._rcvd.find(b'\n') - rsp = self._rcvd[:pos] - self._rcvd = self._rcvd[pos+1:] + while pos == -1: + self._rcvd += self._sock.recv(256) + pos = self._rcvd.find(b'\n') + rsp = self._rcvd[:pos] + self._rcvd = self._rcvd[pos+1:] + except ConnectionResetError as e: + RapidLog.error('Pipe reset by Prox instance: traffic too high?') + raise return rsp.decode() diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py b/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py index f453c574..1ad54273 100644 --- a/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py +++ b/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py @@ -120,11 +120,12 @@ class RapidLog(object): @staticmethod def exception(exception_info): RapidLog.log.exception(exception_info) - raise Exception(exception_info) + exit(1) + @staticmethod def critical(critical_info): RapidLog.log.critical(critical_info) - raise Exception(critical_info) + exit(1) @staticmethod def error(error_info): diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py b/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py index 33fb8dfc..5f78ec01 100755 --- a/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py +++ b/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py @@ -87,52 +87,74 @@ class RapidTestManager(object): if machine_params['prox_socket']: sut_machine = machine self.machines.append(machine) - prox_executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines)) - self.future_to_prox = {prox_executor.submit(machine.start_prox): machine for machine in self.machines} - if configonly: - concurrent.futures.wait(self.future_to_prox,return_when=ALL_COMPLETED) - sys.exit() - with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines)) as executor: - future_to_connect_prox = {executor.submit(machine.connect_prox): machine for machine in self.machines} + try: + prox_executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines)) + self.future_to_prox = {prox_executor.submit(machine.start_prox): machine for machine in self.machines} + if configonly: + concurrent.futures.wait(self.future_to_prox,return_when=ALL_COMPLETED) + sys.exit() + socket_executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines)) + future_to_connect_prox = {socket_executor.submit(machine.connect_prox): machine for machine in self.machines} concurrent.futures.wait(future_to_connect_prox,return_when=ALL_COMPLETED) - result = 0 - for test_param in test_params['tests']: - RapidLog.info(test_param['test']) - if test_param['test'] in ['flowsizetest', 'TST009test', - 'fixed_rate', 'increment_till_fail']: - test = FlowSizeTest(test_param, test_params['lat_percentile'], - test_params['runtime'], - test_params['TestName'], - test_params['environment_file'], gen_machine, - sut_machine, background_machines) - elif test_param['test'] in ['corestatstest']: - test = CoreStatsTest(test_param, test_params['runtime'], - test_params['TestName'], - test_params['environment_file'], self.machines) - elif test_param['test'] in ['portstatstest']: - test = PortStatsTest(test_param, test_params['runtime'], - test_params['TestName'], - test_params['environment_file'], self.machines) - elif test_param['test'] in ['impairtest']: - test = ImpairTest(test_param, test_params['lat_percentile'], - test_params['runtime'], - test_params['TestName'], - test_params['environment_file'], gen_machine, - sut_machine, background_machines) - elif test_param['test'] in ['irqtest']: - test = IrqTest(test_param, test_params['runtime'], - test_params['TestName'], - test_params['environment_file'], self.machines) - elif test_param['test'] in ['warmuptest']: - test = WarmupTest(test_param, gen_machine) - else: - RapidLog.debug('Test name ({}) is not valid:'.format( - test_param['test'])) - single_test_result, result_details = test.run() - result = result + single_test_result - for machine in self.machines: - machine.close_prox() - concurrent.futures.wait(self.future_to_prox,return_when=ALL_COMPLETED) + result = 0 + for test_param in test_params['tests']: + RapidLog.info(test_param['test']) + if test_param['test'] in ['flowsizetest', 'TST009test', + 'fixed_rate', 'increment_till_fail']: + test = FlowSizeTest(test_param, + test_params['lat_percentile'], + test_params['runtime'], + test_params['TestName'], + test_params['environment_file'], + gen_machine, + sut_machine, background_machines) + elif test_param['test'] in ['corestatstest']: + test = CoreStatsTest(test_param, + test_params['runtime'], + test_params['TestName'], + test_params['environment_file'], + self.machines) + elif test_param['test'] in ['portstatstest']: + test = PortStatsTest(test_param, + test_params['runtime'], + test_params['TestName'], + test_params['environment_file'], + self.machines) + elif test_param['test'] in ['impairtest']: + test = ImpairTest(test_param, + test_params['lat_percentile'], + test_params['runtime'], + test_params['TestName'], + test_params['environment_file'], + gen_machine, + sut_machine, background_machines) + elif test_param['test'] in ['irqtest']: + test = IrqTest(test_param, + test_params['runtime'], + test_params['TestName'], + test_params['environment_file'], + self.machines) + elif test_param['test'] in ['warmuptest']: + test = WarmupTest(test_param, + gen_machine) + else: + RapidLog.debug('Test name ({}) is not valid:'.format( + test_param['test'])) + single_test_result, result_details = test.run() + result = result + single_test_result + for machine in self.machines: + machine.close_prox() + concurrent.futures.wait(self.future_to_prox, + return_when=ALL_COMPLETED) + except (ConnectionError, KeyboardInterrupt) as e: + result = result_details = None + socket_executor.shutdown(wait=False) + socket_executor._threads.clear() + prox_executor.shutdown(wait=False) + prox_executor._threads.clear() + concurrent.futures.thread._threads_queues.clear() + RapidLog.error("Test interrupted: {} {}".format( + type(e).__name__,e)) return (result, result_details) def main(): -- cgit 1.2.3-korg