summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLuc Provoost <luc.provoost@intel.com>2021-05-10 16:04:22 +0200
committerLuc Provoost <luc.provoost@intel.com>2021-05-12 19:07:47 +0000
commitef3d8d2223899f70d93be3ffa5ff14e7ad2d421f (patch)
tree157937a759cbce15c76020c6d1ca5ba116b7047d
parent894f62216ca3c97ef0c1c09b9475459b9324b1bb (diff)
Fix latency buckets with multiple lat cores
The bucket data of the last latency core was taken into account when collecting the latency stats. We are now adding the bucket data for all cores. Also changed the error reporting when PROX instance is breaking the pipe connection (e.g. when the dataplane network is overloaded). Cleaning up some trailing spaces in pox_ctrl.py Change-Id: I09ba01ac65e7e4e9ff03ad47da83aa4f83250a67 Signed-off-by: Luc Provoost <luc.provoost@intel.com>
-rw-r--r--VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py47
-rw-r--r--VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py5
-rwxr-xr-xVNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py112
3 files changed, 98 insertions, 66 deletions
diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py
index 8cd2e3f2..40375c54 100644
--- a/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py
+++ b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py
@@ -25,7 +25,7 @@ import os
import time
import subprocess
import socket
-from rapid_log import RapidLog
+from rapid_log import RapidLog
class prox_ctrl(object):
def __init__(self, ip, key=None, user=None):
@@ -223,9 +223,10 @@ class prox_sock(object):
result['lat_avg'] += int(stats[2])
#min_since begin = int(stats[3])
#max_since_begin = int(stats[4])
- result['lat_tsc'] = int(stats[5]) # Taking the last tsc as the timestamp since
- # PROX will return the same tsc for each
- # core/task combination
+ result['lat_tsc'] = int(stats[5])
+ # Taking the last tsc as the timestamp since
+ # PROX will return the same tsc for each
+ # core/task combination
result['lat_hz'] = int(stats[6])
#coreid = int(stats[7])
#taskid = int(stats[8])
@@ -241,7 +242,7 @@ class prox_sock(object):
result['buckets'][0] = int(stats[1])
for i in range(1, 128):
stats = self._recv().split(':')
- result['buckets'][i] = int(stats[1])
+ result['buckets'][i] += int(stats[1])
result['lat_avg'] = old_div(result['lat_avg'],
number_tasks_returning_stats)
self._send('stats latency(0).used')
@@ -252,7 +253,7 @@ class prox_sock(object):
return (result)
def irq_stats(self, core, bucket, task=0):
- self._send('stats task.core(%s).task(%s).irq(%s)' %
+ self._send('stats task.core(%s).task(%s).irq(%s)' %
(core, task, bucket))
stats = self._recv().split(',')
return int(stats[0])
@@ -266,12 +267,12 @@ class prox_sock(object):
def core_stats(self, cores, tasks=[0]):
rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0
- self._send('dp core stats %s %s' % (','.join(map(str, cores)),
+ self._send('dp core stats %s %s' % (','.join(map(str, cores)),
','.join(map(str, tasks))))
for core in cores:
for task in tasks:
stats = self._recv().split(',')
- if stats[0].startswith('error'):
+ if stats[0].startswith('error'):
if stats[0].startswith('error: invalid syntax'):
RapidLog.critical("dp core stats error: unexpected \
invalid syntax (potential incompatibility \
@@ -292,7 +293,7 @@ class prox_sock(object):
rx = tx = port_id = tsc = no_mbufs = errors = 0
self._send('multi port stats %s' % (','.join(map(str, ports))))
result = self._recv().split(';')
- if result[0].startswith('error'):
+ if result[0].startswith('error'):
RapidLog.critical("multi port stats error: unexpected invalid \
syntax (potential incompatibility between scripts and \
PROX)")
@@ -308,19 +309,19 @@ class prox_sock(object):
return rx, tx, no_mbufs, errors, tsc
def set_random(self, cores, task, offset, mask, length):
- self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)),
+ self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)),
task, offset, mask, length))
def set_size(self, cores, task, pkt_size):
- self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task,
+ self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task,
pkt_size))
def set_imix(self, cores, task, imix):
- self._send('imix %s %s %s' % (','.join(map(str, cores)), task,
+ self._send('imix %s %s %s' % (','.join(map(str, cores)), task,
','.join(map(str,imix))))
def set_value(self, cores, task, offset, value, length):
- self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)),
+ self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)),
task, offset, value, length))
def quit_prox(self):
@@ -330,16 +331,24 @@ class prox_sock(object):
"""Append LF and send command to the PROX instance."""
if self._sock is None:
raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd)
- self._sock.sendall(cmd.encode() + b'\n')
+ try:
+ self._sock.sendall(cmd.encode() + b'\n')
+ except ConnectionResetError as e:
+ RapidLog.error('Pipe reset by Prox instance: traffic too high?')
+ raise
def _recv(self):
"""Receive response from PROX instance, return it with LF removed."""
if self._sock is None:
raise RuntimeError("PROX socket closed, cannot receive anymore")
- pos = self._rcvd.find(b'\n')
- while pos == -1:
- self._rcvd += self._sock.recv(256)
+ try:
pos = self._rcvd.find(b'\n')
- rsp = self._rcvd[:pos]
- self._rcvd = self._rcvd[pos+1:]
+ while pos == -1:
+ self._rcvd += self._sock.recv(256)
+ pos = self._rcvd.find(b'\n')
+ rsp = self._rcvd[:pos]
+ self._rcvd = self._rcvd[pos+1:]
+ except ConnectionResetError as e:
+ RapidLog.error('Pipe reset by Prox instance: traffic too high?')
+ raise
return rsp.decode()
diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py b/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py
index f453c574..1ad54273 100644
--- a/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py
+++ b/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py
@@ -120,11 +120,12 @@ class RapidLog(object):
@staticmethod
def exception(exception_info):
RapidLog.log.exception(exception_info)
- raise Exception(exception_info)
+ exit(1)
+ @staticmethod
def critical(critical_info):
RapidLog.log.critical(critical_info)
- raise Exception(critical_info)
+ exit(1)
@staticmethod
def error(error_info):
diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py b/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py
index 33fb8dfc..5f78ec01 100755
--- a/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py
+++ b/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py
@@ -87,52 +87,74 @@ class RapidTestManager(object):
if machine_params['prox_socket']:
sut_machine = machine
self.machines.append(machine)
- prox_executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines))
- self.future_to_prox = {prox_executor.submit(machine.start_prox): machine for machine in self.machines}
- if configonly:
- concurrent.futures.wait(self.future_to_prox,return_when=ALL_COMPLETED)
- sys.exit()
- with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines)) as executor:
- future_to_connect_prox = {executor.submit(machine.connect_prox): machine for machine in self.machines}
+ try:
+ prox_executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines))
+ self.future_to_prox = {prox_executor.submit(machine.start_prox): machine for machine in self.machines}
+ if configonly:
+ concurrent.futures.wait(self.future_to_prox,return_when=ALL_COMPLETED)
+ sys.exit()
+ socket_executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines))
+ future_to_connect_prox = {socket_executor.submit(machine.connect_prox): machine for machine in self.machines}
concurrent.futures.wait(future_to_connect_prox,return_when=ALL_COMPLETED)
- result = 0
- for test_param in test_params['tests']:
- RapidLog.info(test_param['test'])
- if test_param['test'] in ['flowsizetest', 'TST009test',
- 'fixed_rate', 'increment_till_fail']:
- test = FlowSizeTest(test_param, test_params['lat_percentile'],
- test_params['runtime'],
- test_params['TestName'],
- test_params['environment_file'], gen_machine,
- sut_machine, background_machines)
- elif test_param['test'] in ['corestatstest']:
- test = CoreStatsTest(test_param, test_params['runtime'],
- test_params['TestName'],
- test_params['environment_file'], self.machines)
- elif test_param['test'] in ['portstatstest']:
- test = PortStatsTest(test_param, test_params['runtime'],
- test_params['TestName'],
- test_params['environment_file'], self.machines)
- elif test_param['test'] in ['impairtest']:
- test = ImpairTest(test_param, test_params['lat_percentile'],
- test_params['runtime'],
- test_params['TestName'],
- test_params['environment_file'], gen_machine,
- sut_machine, background_machines)
- elif test_param['test'] in ['irqtest']:
- test = IrqTest(test_param, test_params['runtime'],
- test_params['TestName'],
- test_params['environment_file'], self.machines)
- elif test_param['test'] in ['warmuptest']:
- test = WarmupTest(test_param, gen_machine)
- else:
- RapidLog.debug('Test name ({}) is not valid:'.format(
- test_param['test']))
- single_test_result, result_details = test.run()
- result = result + single_test_result
- for machine in self.machines:
- machine.close_prox()
- concurrent.futures.wait(self.future_to_prox,return_when=ALL_COMPLETED)
+ result = 0
+ for test_param in test_params['tests']:
+ RapidLog.info(test_param['test'])
+ if test_param['test'] in ['flowsizetest', 'TST009test',
+ 'fixed_rate', 'increment_till_fail']:
+ test = FlowSizeTest(test_param,
+ test_params['lat_percentile'],
+ test_params['runtime'],
+ test_params['TestName'],
+ test_params['environment_file'],
+ gen_machine,
+ sut_machine, background_machines)
+ elif test_param['test'] in ['corestatstest']:
+ test = CoreStatsTest(test_param,
+ test_params['runtime'],
+ test_params['TestName'],
+ test_params['environment_file'],
+ self.machines)
+ elif test_param['test'] in ['portstatstest']:
+ test = PortStatsTest(test_param,
+ test_params['runtime'],
+ test_params['TestName'],
+ test_params['environment_file'],
+ self.machines)
+ elif test_param['test'] in ['impairtest']:
+ test = ImpairTest(test_param,
+ test_params['lat_percentile'],
+ test_params['runtime'],
+ test_params['TestName'],
+ test_params['environment_file'],
+ gen_machine,
+ sut_machine, background_machines)
+ elif test_param['test'] in ['irqtest']:
+ test = IrqTest(test_param,
+ test_params['runtime'],
+ test_params['TestName'],
+ test_params['environment_file'],
+ self.machines)
+ elif test_param['test'] in ['warmuptest']:
+ test = WarmupTest(test_param,
+ gen_machine)
+ else:
+ RapidLog.debug('Test name ({}) is not valid:'.format(
+ test_param['test']))
+ single_test_result, result_details = test.run()
+ result = result + single_test_result
+ for machine in self.machines:
+ machine.close_prox()
+ concurrent.futures.wait(self.future_to_prox,
+ return_when=ALL_COMPLETED)
+ except (ConnectionError, KeyboardInterrupt) as e:
+ result = result_details = None
+ socket_executor.shutdown(wait=False)
+ socket_executor._threads.clear()
+ prox_executor.shutdown(wait=False)
+ prox_executor._threads.clear()
+ concurrent.futures.thread._threads_queues.clear()
+ RapidLog.error("Test interrupted: {} {}".format(
+ type(e).__name__,e))
return (result, result_details)
def main():