summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py47
-rw-r--r--VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py5
-rwxr-xr-xVNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py112
3 files changed, 98 insertions, 66 deletions
diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py
index 8cd2e3f2..40375c54 100644
--- a/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py
+++ b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py
@@ -25,7 +25,7 @@ import os
import time
import subprocess
import socket
-from rapid_log import RapidLog
+from rapid_log import RapidLog
class prox_ctrl(object):
def __init__(self, ip, key=None, user=None):
@@ -223,9 +223,10 @@ class prox_sock(object):
result['lat_avg'] += int(stats[2])
#min_since begin = int(stats[3])
#max_since_begin = int(stats[4])
- result['lat_tsc'] = int(stats[5]) # Taking the last tsc as the timestamp since
- # PROX will return the same tsc for each
- # core/task combination
+ result['lat_tsc'] = int(stats[5])
+ # Taking the last tsc as the timestamp since
+ # PROX will return the same tsc for each
+ # core/task combination
result['lat_hz'] = int(stats[6])
#coreid = int(stats[7])
#taskid = int(stats[8])
@@ -241,7 +242,7 @@ class prox_sock(object):
result['buckets'][0] = int(stats[1])
for i in range(1, 128):
stats = self._recv().split(':')
- result['buckets'][i] = int(stats[1])
+ result['buckets'][i] += int(stats[1])
result['lat_avg'] = old_div(result['lat_avg'],
number_tasks_returning_stats)
self._send('stats latency(0).used')
@@ -252,7 +253,7 @@ class prox_sock(object):
return (result)
def irq_stats(self, core, bucket, task=0):
- self._send('stats task.core(%s).task(%s).irq(%s)' %
+ self._send('stats task.core(%s).task(%s).irq(%s)' %
(core, task, bucket))
stats = self._recv().split(',')
return int(stats[0])
@@ -266,12 +267,12 @@ class prox_sock(object):
def core_stats(self, cores, tasks=[0]):
rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0
- self._send('dp core stats %s %s' % (','.join(map(str, cores)),
+ self._send('dp core stats %s %s' % (','.join(map(str, cores)),
','.join(map(str, tasks))))
for core in cores:
for task in tasks:
stats = self._recv().split(',')
- if stats[0].startswith('error'):
+ if stats[0].startswith('error'):
if stats[0].startswith('error: invalid syntax'):
RapidLog.critical("dp core stats error: unexpected \
invalid syntax (potential incompatibility \
@@ -292,7 +293,7 @@ class prox_sock(object):
rx = tx = port_id = tsc = no_mbufs = errors = 0
self._send('multi port stats %s' % (','.join(map(str, ports))))
result = self._recv().split(';')
- if result[0].startswith('error'):
+ if result[0].startswith('error'):
RapidLog.critical("multi port stats error: unexpected invalid \
syntax (potential incompatibility between scripts and \
PROX)")
@@ -308,19 +309,19 @@ class prox_sock(object):
return rx, tx, no_mbufs, errors, tsc
def set_random(self, cores, task, offset, mask, length):
- self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)),
+ self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)),
task, offset, mask, length))
def set_size(self, cores, task, pkt_size):
- self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task,
+ self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task,
pkt_size))
def set_imix(self, cores, task, imix):
- self._send('imix %s %s %s' % (','.join(map(str, cores)), task,
+ self._send('imix %s %s %s' % (','.join(map(str, cores)), task,
','.join(map(str,imix))))
def set_value(self, cores, task, offset, value, length):
- self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)),
+ self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)),
task, offset, value, length))
def quit_prox(self):
@@ -330,16 +331,24 @@ class prox_sock(object):
"""Append LF and send command to the PROX instance."""
if self._sock is None:
raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd)
- self._sock.sendall(cmd.encode() + b'\n')
+ try:
+ self._sock.sendall(cmd.encode() + b'\n')
+ except ConnectionResetError as e:
+ RapidLog.error('Pipe reset by Prox instance: traffic too high?')
+ raise
def _recv(self):
"""Receive response from PROX instance, return it with LF removed."""
if self._sock is None:
raise RuntimeError("PROX socket closed, cannot receive anymore")
- pos = self._rcvd.find(b'\n')
- while pos == -1:
- self._rcvd += self._sock.recv(256)
+ try:
pos = self._rcvd.find(b'\n')
- rsp = self._rcvd[:pos]
- self._rcvd = self._rcvd[pos+1:]
+ while pos == -1:
+ self._rcvd += self._sock.recv(256)
+ pos = self._rcvd.find(b'\n')
+ rsp = self._rcvd[:pos]
+ self._rcvd = self._rcvd[pos+1:]
+ except ConnectionResetError as e:
+ RapidLog.error('Pipe reset by Prox instance: traffic too high?')
+ raise
return rsp.decode()
diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py b/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py
index f453c574..1ad54273 100644
--- a/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py
+++ b/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py
@@ -120,11 +120,12 @@ class RapidLog(object):
@staticmethod
def exception(exception_info):
RapidLog.log.exception(exception_info)
- raise Exception(exception_info)
+ exit(1)
+ @staticmethod
def critical(critical_info):
RapidLog.log.critical(critical_info)
- raise Exception(critical_info)
+ exit(1)
@staticmethod
def error(error_info):
diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py b/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py
index 33fb8dfc..5f78ec01 100755
--- a/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py
+++ b/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py
@@ -87,52 +87,74 @@ class RapidTestManager(object):
if machine_params['prox_socket']:
sut_machine = machine
self.machines.append(machine)
- prox_executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines))
- self.future_to_prox = {prox_executor.submit(machine.start_prox): machine for machine in self.machines}
- if configonly:
- concurrent.futures.wait(self.future_to_prox,return_when=ALL_COMPLETED)
- sys.exit()
- with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines)) as executor:
- future_to_connect_prox = {executor.submit(machine.connect_prox): machine for machine in self.machines}
+ try:
+ prox_executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines))
+ self.future_to_prox = {prox_executor.submit(machine.start_prox): machine for machine in self.machines}
+ if configonly:
+ concurrent.futures.wait(self.future_to_prox,return_when=ALL_COMPLETED)
+ sys.exit()
+ socket_executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines))
+ future_to_connect_prox = {socket_executor.submit(machine.connect_prox): machine for machine in self.machines}
concurrent.futures.wait(future_to_connect_prox,return_when=ALL_COMPLETED)
- result = 0
- for test_param in test_params['tests']:
- RapidLog.info(test_param['test'])
- if test_param['test'] in ['flowsizetest', 'TST009test',
- 'fixed_rate', 'increment_till_fail']:
- test = FlowSizeTest(test_param, test_params['lat_percentile'],
- test_params['runtime'],
- test_params['TestName'],
- test_params['environment_file'], gen_machine,
- sut_machine, background_machines)
- elif test_param['test'] in ['corestatstest']:
- test = CoreStatsTest(test_param, test_params['runtime'],
- test_params['TestName'],
- test_params['environment_file'], self.machines)
- elif test_param['test'] in ['portstatstest']:
- test = PortStatsTest(test_param, test_params['runtime'],
- test_params['TestName'],
- test_params['environment_file'], self.machines)
- elif test_param['test'] in ['impairtest']:
- test = ImpairTest(test_param, test_params['lat_percentile'],
- test_params['runtime'],
- test_params['TestName'],
- test_params['environment_file'], gen_machine,
- sut_machine, background_machines)
- elif test_param['test'] in ['irqtest']:
- test = IrqTest(test_param, test_params['runtime'],
- test_params['TestName'],
- test_params['environment_file'], self.machines)
- elif test_param['test'] in ['warmuptest']:
- test = WarmupTest(test_param, gen_machine)
- else:
- RapidLog.debug('Test name ({}) is not valid:'.format(
- test_param['test']))
- single_test_result, result_details = test.run()
- result = result + single_test_result
- for machine in self.machines:
- machine.close_prox()
- concurrent.futures.wait(self.future_to_prox,return_when=ALL_COMPLETED)
+ result = 0
+ for test_param in test_params['tests']:
+ RapidLog.info(test_param['test'])
+ if test_param['test'] in ['flowsizetest', 'TST009test',
+ 'fixed_rate', 'increment_till_fail']:
+ test = FlowSizeTest(test_param,
+ test_params['lat_percentile'],
+ test_params['runtime'],
+ test_params['TestName'],
+ test_params['environment_file'],
+ gen_machine,
+ sut_machine, background_machines)
+ elif test_param['test'] in ['corestatstest']:
+ test = CoreStatsTest(test_param,
+ test_params['runtime'],
+ test_params['TestName'],
+ test_params['environment_file'],
+ self.machines)
+ elif test_param['test'] in ['portstatstest']:
+ test = PortStatsTest(test_param,
+ test_params['runtime'],
+ test_params['TestName'],
+ test_params['environment_file'],
+ self.machines)
+ elif test_param['test'] in ['impairtest']:
+ test = ImpairTest(test_param,
+ test_params['lat_percentile'],
+ test_params['runtime'],
+ test_params['TestName'],
+ test_params['environment_file'],
+ gen_machine,
+ sut_machine, background_machines)
+ elif test_param['test'] in ['irqtest']:
+ test = IrqTest(test_param,
+ test_params['runtime'],
+ test_params['TestName'],
+ test_params['environment_file'],
+ self.machines)
+ elif test_param['test'] in ['warmuptest']:
+ test = WarmupTest(test_param,
+ gen_machine)
+ else:
+ RapidLog.debug('Test name ({}) is not valid:'.format(
+ test_param['test']))
+ single_test_result, result_details = test.run()
+ result = result + single_test_result
+ for machine in self.machines:
+ machine.close_prox()
+ concurrent.futures.wait(self.future_to_prox,
+ return_when=ALL_COMPLETED)
+ except (ConnectionError, KeyboardInterrupt) as e:
+ result = result_details = None
+ socket_executor.shutdown(wait=False)
+ socket_executor._threads.clear()
+ prox_executor.shutdown(wait=False)
+ prox_executor._threads.clear()
+ concurrent.futures.thread._threads_queues.clear()
+ RapidLog.error("Test interrupted: {} {}".format(
+ type(e).__name__,e))
return (result, result_details)
def main():