summaryrefslogtreecommitdiffstats
path: root/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py
diff options
context:
space:
mode:
Diffstat (limited to 'VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py')
-rw-r--r--VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py183
1 files changed, 67 insertions, 116 deletions
diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py
index 5fc98db3..8754ebc4 100644
--- a/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py
+++ b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py
@@ -25,40 +25,39 @@ import os
import time
import subprocess
import socket
-from rapid_log import RapidLog
+from rapid_log import RapidLog
+from rapid_sshclient import SSHClient
class prox_ctrl(object):
- def __init__(self, ip, key=None, user=None):
+ def __init__(self, ip, key=None, user=None, password = None):
self._ip = ip
self._key = key
self._user = user
+ self._password = password
self._proxsock = []
+ self._sshclient = SSHClient(ip = ip, user = user, password = password,
+ rsa_private_key = key, timeout = None)
def ip(self):
return self._ip
- def test_connect(self):
- """Simply try to run 'true' over ssh on remote system.
- On failure, raise RuntimeWarning exception when possibly worth
- retrying, and raise RuntimeError exception otherwise.
- """
- return self.run_cmd('test -e /opt/rapid/system_ready_for_rapid', True)
-
- def connect(self):
+ def test_connection(self):
attempts = 1
RapidLog.debug("Trying to connect to machine \
on %s, attempt: %d" % (self._ip, attempts))
while True:
try:
- self.test_connect()
- break
+ if (self.run_cmd('test -e /opt/rapid/system_ready_for_rapid \
+ && echo exists')):
+ break
+ time.sleep(2)
except RuntimeWarning as ex:
+ RapidLog.debug("RuntimeWarning %d:\n%s"
+ % (ex.returncode, ex.output.strip()))
attempts += 1
if attempts > 20:
RapidLog.exception("Failed to connect to instance after %d\
attempts:\n%s" % (attempts, ex))
- raise Exception("Failed to connect to instance after %d \
- attempts:\n%s" % (attempts, ex))
time.sleep(2)
RapidLog.debug("Trying to connect to machine \
on %s, attempt: %d" % (self._ip, attempts))
@@ -77,8 +76,6 @@ class prox_ctrl(object):
if attempts > 20:
RapidLog.exception("Failed to connect to PROX on %s after %d \
attempts" % (self._ip, attempts))
- raise Exception("Failed to connect to PROX on %s after %d \
- attempts" % (self._ip, attempts))
time.sleep(2)
RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
attempt: %d" % (self._ip, attempts))
@@ -89,21 +86,9 @@ class prox_ctrl(object):
for sock in self._proxsock:
sock.quit()
- def run_cmd(self, command, _connect=False):
- """Execute command over ssh on remote system.
- Wait for remote command completion.
- Return command output (combined stdout and stderr).
- _connect argument is reserved for connect() method.
- """
- cmd = self._build_ssh(command)
- try:
- return subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- except subprocess.CalledProcessError as ex:
- #if _connect and ex.returncode == 255:
- if _connect:
- raise RuntimeWarning(ex.output.strip())
- raise RuntimeError('ssh returned exit status %d:\n%s'
- % (ex.returncode, ex.output.strip()))
+ def run_cmd(self, command):
+ self._sshclient.run_cmd(command)
+ return self._sshclient.get_output()
def prox_sock(self, port=8474):
"""Connect to the PROX instance on remote system.
@@ -119,64 +104,13 @@ class prox_ctrl(object):
return None
def scp_put(self, src, dst):
- """Copy src file from local system to dst on remote system."""
- cmd = [ 'scp',
- '-B',
- '-oStrictHostKeyChecking=no',
- '-oUserKnownHostsFile=/dev/null',
- '-oLogLevel=ERROR' ]
- if self._key is not None:
- cmd.extend(['-i', self._key])
- cmd.append(src)
- remote = ''
- if self._user is not None:
- remote += self._user + '@'
- remote += self._ip + ':' + dst
- cmd.append(remote)
- try:
- # Actually ignore output on success, but capture stderr on failure
- subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- except subprocess.CalledProcessError as ex:
- raise RuntimeError('scp returned exit status %d:\n%s'
- % (ex.returncode, ex.output.strip()))
+ self._sshclient.scp_put(src, dst)
+ RapidLog.info("Copying from {} to {}:{}".format(src, self._ip, dst))
def scp_get(self, src, dst):
- """Copy src file from remote system to dst on local system."""
- cmd = [ 'scp',
- '-B',
- '-oStrictHostKeyChecking=no',
- '-oUserKnownHostsFile=/dev/null',
- '-oLogLevel=ERROR' ]
- if self._key is not None:
- cmd.extend(['-i', self._key])
- remote = ''
- if self._user is not None:
- remote += self._user + '@'
- remote += self._ip + ':/home/' + self._user + src
- cmd.append(remote)
- cmd.append(dst)
- try:
- # Actually ignore output on success, but capture stderr on failure
- subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- except subprocess.CalledProcessError as ex:
- raise RuntimeError('scp returned exit status %d:\n%s'
- % (ex.returncode, ex.output.strip()))
-
- def _build_ssh(self, command):
- cmd = [ 'ssh',
- '-oBatchMode=yes',
- '-oStrictHostKeyChecking=no',
- '-oUserKnownHostsFile=/dev/null',
- '-oLogLevel=ERROR' ]
- if self._key is not None:
- cmd.extend(['-i', self._key])
- remote = ''
- if self._user is not None:
- remote += self._user + '@'
- remote += self._ip
- cmd.append(remote)
- cmd.append(command)
- return cmd
+ self._sshclient.scp_get('/home/' + self._user + src, dst)
+ RapidLog.info("Copying from {}:/home/{}{} to {}".format(self._ip,
+ self._user, src, dst))
class prox_sock(object):
def __init__(self, sock):
@@ -203,10 +137,14 @@ class prox_sock(object):
self._send('reset stats')
def lat_stats(self, cores, tasks=[0]):
- min_lat = 999999999
- max_lat = avg_lat = 0
+ result = {}
+ result['lat_min'] = 999999999
+ result['lat_max'] = result['lat_avg'] = 0
+ result['buckets'] = [0] * 128
+ result['mis_ordered'] = 0
+ result['extent'] = 0
+ result['duplicate'] = 0
number_tasks_returning_stats = 0
- buckets = [0] * 128
self._send('lat all stats %s %s' % (','.join(map(str, cores)),
','.join(map(str, tasks))))
for core in cores:
@@ -219,37 +157,42 @@ class prox_sock(object):
(potential incompatibility between scripts and PROX)")
raise Exception("lat stats error")
number_tasks_returning_stats += 1
- min_lat = min(int(stats[0]),min_lat)
- max_lat = max(int(stats[1]),max_lat)
- avg_lat += int(stats[2])
+ result['lat_min'] = min(int(stats[0]),result['lat_min'])
+ result['lat_max'] = max(int(stats[1]),result['lat_max'])
+ result['lat_avg'] += int(stats[2])
#min_since begin = int(stats[3])
#max_since_begin = int(stats[4])
- tsc = int(stats[5]) # Taking the last tsc as the timestamp since
- # PROX will return the same tsc for each
- # core/task combination
- hz = int(stats[6])
+ result['lat_tsc'] = int(stats[5])
+ # Taking the last tsc as the timestamp since
+ # PROX will return the same tsc for each
+ # core/task combination
+ result['lat_hz'] = int(stats[6])
#coreid = int(stats[7])
#taskid = int(stats[8])
+ result['mis_ordered'] += int(stats[9])
+ result['extent'] += int(stats[10])
+ result['duplicate'] += int(stats[11])
stats = self._recv().split(':')
if stats[0].startswith('error'):
RapidLog.critical("lat stats error: unexpected lat bucket \
reply (potential incompatibility between scripts \
and PROX)")
raise Exception("lat bucket reply error")
- buckets[0] = int(stats[1])
+ result['buckets'][0] = int(stats[1])
for i in range(1, 128):
stats = self._recv().split(':')
- buckets[i] = int(stats[1])
- avg_lat = old_div(avg_lat,number_tasks_returning_stats)
+ result['buckets'][i] += int(stats[1])
+ result['lat_avg'] = old_div(result['lat_avg'],
+ number_tasks_returning_stats)
self._send('stats latency(0).used')
used = float(self._recv())
self._send('stats latency(0).total')
total = float(self._recv())
- return (min_lat, max_lat, avg_lat, (old_div(used,total)), tsc, hz,
- buckets)
+ result['lat_used'] = old_div(used,total)
+ return (result)
def irq_stats(self, core, bucket, task=0):
- self._send('stats task.core(%s).task(%s).irq(%s)' %
+ self._send('stats task.core(%s).task(%s).irq(%s)' %
(core, task, bucket))
stats = self._recv().split(',')
return int(stats[0])
@@ -263,12 +206,12 @@ class prox_sock(object):
def core_stats(self, cores, tasks=[0]):
rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0
- self._send('dp core stats %s %s' % (','.join(map(str, cores)),
+ self._send('dp core stats %s %s' % (','.join(map(str, cores)),
','.join(map(str, tasks))))
for core in cores:
for task in tasks:
stats = self._recv().split(',')
- if stats[0].startswith('error'):
+ if stats[0].startswith('error'):
if stats[0].startswith('error: invalid syntax'):
RapidLog.critical("dp core stats error: unexpected \
invalid syntax (potential incompatibility \
@@ -289,7 +232,7 @@ class prox_sock(object):
rx = tx = port_id = tsc = no_mbufs = errors = 0
self._send('multi port stats %s' % (','.join(map(str, ports))))
result = self._recv().split(';')
- if result[0].startswith('error'):
+ if result[0].startswith('error'):
RapidLog.critical("multi port stats error: unexpected invalid \
syntax (potential incompatibility between scripts and \
PROX)")
@@ -305,19 +248,19 @@ class prox_sock(object):
return rx, tx, no_mbufs, errors, tsc
def set_random(self, cores, task, offset, mask, length):
- self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)),
+ self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)),
task, offset, mask, length))
def set_size(self, cores, task, pkt_size):
- self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task,
+ self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task,
pkt_size))
def set_imix(self, cores, task, imix):
- self._send('imix %s %s %s' % (','.join(map(str, cores)), task,
+ self._send('imix %s %s %s' % (','.join(map(str, cores)), task,
','.join(map(str,imix))))
def set_value(self, cores, task, offset, value, length):
- self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)),
+ self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)),
task, offset, value, length))
def quit_prox(self):
@@ -327,16 +270,24 @@ class prox_sock(object):
"""Append LF and send command to the PROX instance."""
if self._sock is None:
raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd)
- self._sock.sendall(cmd.encode() + b'\n')
+ try:
+ self._sock.sendall(cmd.encode() + b'\n')
+ except ConnectionResetError as e:
+ RapidLog.error('Pipe reset by Prox instance: traffic too high?')
+ raise
def _recv(self):
"""Receive response from PROX instance, return it with LF removed."""
if self._sock is None:
raise RuntimeError("PROX socket closed, cannot receive anymore")
- pos = self._rcvd.find(b'\n')
- while pos == -1:
- self._rcvd += self._sock.recv(256)
+ try:
pos = self._rcvd.find(b'\n')
- rsp = self._rcvd[:pos]
- self._rcvd = self._rcvd[pos+1:]
+ while pos == -1:
+ self._rcvd += self._sock.recv(256)
+ pos = self._rcvd.find(b'\n')
+ rsp = self._rcvd[:pos]
+ self._rcvd = self._rcvd[pos+1:]
+ except ConnectionResetError as e:
+ RapidLog.error('Pipe reset by Prox instance: traffic too high?')
+ raise
return rsp.decode()