diff options
-rw-r--r-- | VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py | 47 | ||||
-rw-r--r-- | VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py | 5 | ||||
-rwxr-xr-x | VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py | 112 |
3 files changed, 98 insertions, 66 deletions
diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py index 8cd2e3f2..40375c54 100644 --- a/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py +++ b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py @@ -25,7 +25,7 @@ import os import time import subprocess import socket -from rapid_log import RapidLog +from rapid_log import RapidLog class prox_ctrl(object): def __init__(self, ip, key=None, user=None): @@ -223,9 +223,10 @@ class prox_sock(object): result['lat_avg'] += int(stats[2]) #min_since begin = int(stats[3]) #max_since_begin = int(stats[4]) - result['lat_tsc'] = int(stats[5]) # Taking the last tsc as the timestamp since - # PROX will return the same tsc for each - # core/task combination + result['lat_tsc'] = int(stats[5]) + # Taking the last tsc as the timestamp since + # PROX will return the same tsc for each + # core/task combination result['lat_hz'] = int(stats[6]) #coreid = int(stats[7]) #taskid = int(stats[8]) @@ -241,7 +242,7 @@ class prox_sock(object): result['buckets'][0] = int(stats[1]) for i in range(1, 128): stats = self._recv().split(':') - result['buckets'][i] = int(stats[1]) + result['buckets'][i] += int(stats[1]) result['lat_avg'] = old_div(result['lat_avg'], number_tasks_returning_stats) self._send('stats latency(0).used') @@ -252,7 +253,7 @@ class prox_sock(object): return (result) def irq_stats(self, core, bucket, task=0): - self._send('stats task.core(%s).task(%s).irq(%s)' % + self._send('stats task.core(%s).task(%s).irq(%s)' % (core, task, bucket)) stats = self._recv().split(',') return int(stats[0]) @@ -266,12 +267,12 @@ class prox_sock(object): def core_stats(self, cores, tasks=[0]): rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0 - self._send('dp core stats %s %s' % (','.join(map(str, cores)), + self._send('dp core stats %s %s' % (','.join(map(str, cores)), ','.join(map(str, tasks)))) for core in cores: for task in tasks: stats = self._recv().split(',') - if stats[0].startswith('error'): + if stats[0].startswith('error'): if stats[0].startswith('error: invalid syntax'): RapidLog.critical("dp core stats error: unexpected \ invalid syntax (potential incompatibility \ @@ -292,7 +293,7 @@ class prox_sock(object): rx = tx = port_id = tsc = no_mbufs = errors = 0 self._send('multi port stats %s' % (','.join(map(str, ports)))) result = self._recv().split(';') - if result[0].startswith('error'): + if result[0].startswith('error'): RapidLog.critical("multi port stats error: unexpected invalid \ syntax (potential incompatibility between scripts and \ PROX)") @@ -308,19 +309,19 @@ class prox_sock(object): return rx, tx, no_mbufs, errors, tsc def set_random(self, cores, task, offset, mask, length): - self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)), + self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)), task, offset, mask, length)) def set_size(self, cores, task, pkt_size): - self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task, + self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task, pkt_size)) def set_imix(self, cores, task, imix): - self._send('imix %s %s %s' % (','.join(map(str, cores)), task, + self._send('imix %s %s %s' % (','.join(map(str, cores)), task, ','.join(map(str,imix)))) def set_value(self, cores, task, offset, value, length): - self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)), + self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)), task, offset, value, length)) def quit_prox(self): @@ -330,16 +331,24 @@ class prox_sock(object): """Append LF and send command to the PROX instance.""" if self._sock is None: raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd) - self._sock.sendall(cmd.encode() + b'\n') + try: + self._sock.sendall(cmd.encode() + b'\n') + except ConnectionResetError as e: + RapidLog.error('Pipe reset by Prox instance: traffic too high?') + raise def _recv(self): """Receive response from PROX instance, return it with LF removed.""" if self._sock is None: raise RuntimeError("PROX socket closed, cannot receive anymore") - pos = self._rcvd.find(b'\n') - while pos == -1: - self._rcvd += self._sock.recv(256) + try: pos = self._rcvd.find(b'\n') - rsp = self._rcvd[:pos] - self._rcvd = self._rcvd[pos+1:] + while pos == -1: + self._rcvd += self._sock.recv(256) + pos = self._rcvd.find(b'\n') + rsp = self._rcvd[:pos] + self._rcvd = self._rcvd[pos+1:] + except ConnectionResetError as e: + RapidLog.error('Pipe reset by Prox instance: traffic too high?') + raise return rsp.decode() diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py b/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py index f453c574..1ad54273 100644 --- a/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py +++ b/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py @@ -120,11 +120,12 @@ class RapidLog(object): @staticmethod def exception(exception_info): RapidLog.log.exception(exception_info) - raise Exception(exception_info) + exit(1) + @staticmethod def critical(critical_info): RapidLog.log.critical(critical_info) - raise Exception(critical_info) + exit(1) @staticmethod def error(error_info): diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py b/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py index 33fb8dfc..5f78ec01 100755 --- a/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py +++ b/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py @@ -87,52 +87,74 @@ class RapidTestManager(object): if machine_params['prox_socket']: sut_machine = machine self.machines.append(machine) - prox_executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines)) - self.future_to_prox = {prox_executor.submit(machine.start_prox): machine for machine in self.machines} - if configonly: - concurrent.futures.wait(self.future_to_prox,return_when=ALL_COMPLETED) - sys.exit() - with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines)) as executor: - future_to_connect_prox = {executor.submit(machine.connect_prox): machine for machine in self.machines} + try: + prox_executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines)) + self.future_to_prox = {prox_executor.submit(machine.start_prox): machine for machine in self.machines} + if configonly: + concurrent.futures.wait(self.future_to_prox,return_when=ALL_COMPLETED) + sys.exit() + socket_executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines)) + future_to_connect_prox = {socket_executor.submit(machine.connect_prox): machine for machine in self.machines} concurrent.futures.wait(future_to_connect_prox,return_when=ALL_COMPLETED) - result = 0 - for test_param in test_params['tests']: - RapidLog.info(test_param['test']) - if test_param['test'] in ['flowsizetest', 'TST009test', - 'fixed_rate', 'increment_till_fail']: - test = FlowSizeTest(test_param, test_params['lat_percentile'], - test_params['runtime'], - test_params['TestName'], - test_params['environment_file'], gen_machine, - sut_machine, background_machines) - elif test_param['test'] in ['corestatstest']: - test = CoreStatsTest(test_param, test_params['runtime'], - test_params['TestName'], - test_params['environment_file'], self.machines) - elif test_param['test'] in ['portstatstest']: - test = PortStatsTest(test_param, test_params['runtime'], - test_params['TestName'], - test_params['environment_file'], self.machines) - elif test_param['test'] in ['impairtest']: - test = ImpairTest(test_param, test_params['lat_percentile'], - test_params['runtime'], - test_params['TestName'], - test_params['environment_file'], gen_machine, - sut_machine, background_machines) - elif test_param['test'] in ['irqtest']: - test = IrqTest(test_param, test_params['runtime'], - test_params['TestName'], - test_params['environment_file'], self.machines) - elif test_param['test'] in ['warmuptest']: - test = WarmupTest(test_param, gen_machine) - else: - RapidLog.debug('Test name ({}) is not valid:'.format( - test_param['test'])) - single_test_result, result_details = test.run() - result = result + single_test_result - for machine in self.machines: - machine.close_prox() - concurrent.futures.wait(self.future_to_prox,return_when=ALL_COMPLETED) + result = 0 + for test_param in test_params['tests']: + RapidLog.info(test_param['test']) + if test_param['test'] in ['flowsizetest', 'TST009test', + 'fixed_rate', 'increment_till_fail']: + test = FlowSizeTest(test_param, + test_params['lat_percentile'], + test_params['runtime'], + test_params['TestName'], + test_params['environment_file'], + gen_machine, + sut_machine, background_machines) + elif test_param['test'] in ['corestatstest']: + test = CoreStatsTest(test_param, + test_params['runtime'], + test_params['TestName'], + test_params['environment_file'], + self.machines) + elif test_param['test'] in ['portstatstest']: + test = PortStatsTest(test_param, + test_params['runtime'], + test_params['TestName'], + test_params['environment_file'], + self.machines) + elif test_param['test'] in ['impairtest']: + test = ImpairTest(test_param, + test_params['lat_percentile'], + test_params['runtime'], + test_params['TestName'], + test_params['environment_file'], + gen_machine, + sut_machine, background_machines) + elif test_param['test'] in ['irqtest']: + test = IrqTest(test_param, + test_params['runtime'], + test_params['TestName'], + test_params['environment_file'], + self.machines) + elif test_param['test'] in ['warmuptest']: + test = WarmupTest(test_param, + gen_machine) + else: + RapidLog.debug('Test name ({}) is not valid:'.format( + test_param['test'])) + single_test_result, result_details = test.run() + result = result + single_test_result + for machine in self.machines: + machine.close_prox() + concurrent.futures.wait(self.future_to_prox, + return_when=ALL_COMPLETED) + except (ConnectionError, KeyboardInterrupt) as e: + result = result_details = None + socket_executor.shutdown(wait=False) + socket_executor._threads.clear() + prox_executor.shutdown(wait=False) + prox_executor._threads.clear() + concurrent.futures.thread._threads_queues.clear() + RapidLog.error("Test interrupted: {} {}".format( + type(e).__name__,e)) return (result, result_details) def main(): |