diff options
Diffstat (limited to 'VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py')
-rw-r--r-- | VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py | 183 |
1 files changed, 67 insertions, 116 deletions
diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py index 5fc98db3..8754ebc4 100644 --- a/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py +++ b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py @@ -25,40 +25,39 @@ import os import time import subprocess import socket -from rapid_log import RapidLog +from rapid_log import RapidLog +from rapid_sshclient import SSHClient class prox_ctrl(object): - def __init__(self, ip, key=None, user=None): + def __init__(self, ip, key=None, user=None, password = None): self._ip = ip self._key = key self._user = user + self._password = password self._proxsock = [] + self._sshclient = SSHClient(ip = ip, user = user, password = password, + rsa_private_key = key, timeout = None) def ip(self): return self._ip - def test_connect(self): - """Simply try to run 'true' over ssh on remote system. - On failure, raise RuntimeWarning exception when possibly worth - retrying, and raise RuntimeError exception otherwise. - """ - return self.run_cmd('test -e /opt/rapid/system_ready_for_rapid', True) - - def connect(self): + def test_connection(self): attempts = 1 RapidLog.debug("Trying to connect to machine \ on %s, attempt: %d" % (self._ip, attempts)) while True: try: - self.test_connect() - break + if (self.run_cmd('test -e /opt/rapid/system_ready_for_rapid \ + && echo exists')): + break + time.sleep(2) except RuntimeWarning as ex: + RapidLog.debug("RuntimeWarning %d:\n%s" + % (ex.returncode, ex.output.strip())) attempts += 1 if attempts > 20: RapidLog.exception("Failed to connect to instance after %d\ attempts:\n%s" % (attempts, ex)) - raise Exception("Failed to connect to instance after %d \ - attempts:\n%s" % (attempts, ex)) time.sleep(2) RapidLog.debug("Trying to connect to machine \ on %s, attempt: %d" % (self._ip, attempts)) @@ -77,8 +76,6 @@ class prox_ctrl(object): if attempts > 20: RapidLog.exception("Failed to connect to PROX on %s after %d \ attempts" % (self._ip, attempts)) - raise Exception("Failed to connect to PROX on %s after %d \ - attempts" % (self._ip, attempts)) time.sleep(2) RapidLog.debug("Trying to connect to PROX (just launched) on %s, \ attempt: %d" % (self._ip, attempts)) @@ -89,21 +86,9 @@ class prox_ctrl(object): for sock in self._proxsock: sock.quit() - def run_cmd(self, command, _connect=False): - """Execute command over ssh on remote system. - Wait for remote command completion. - Return command output (combined stdout and stderr). - _connect argument is reserved for connect() method. - """ - cmd = self._build_ssh(command) - try: - return subprocess.check_output(cmd, stderr=subprocess.STDOUT) - except subprocess.CalledProcessError as ex: - #if _connect and ex.returncode == 255: - if _connect: - raise RuntimeWarning(ex.output.strip()) - raise RuntimeError('ssh returned exit status %d:\n%s' - % (ex.returncode, ex.output.strip())) + def run_cmd(self, command): + self._sshclient.run_cmd(command) + return self._sshclient.get_output() def prox_sock(self, port=8474): """Connect to the PROX instance on remote system. @@ -119,64 +104,13 @@ class prox_ctrl(object): return None def scp_put(self, src, dst): - """Copy src file from local system to dst on remote system.""" - cmd = [ 'scp', - '-B', - '-oStrictHostKeyChecking=no', - '-oUserKnownHostsFile=/dev/null', - '-oLogLevel=ERROR' ] - if self._key is not None: - cmd.extend(['-i', self._key]) - cmd.append(src) - remote = '' - if self._user is not None: - remote += self._user + '@' - remote += self._ip + ':' + dst - cmd.append(remote) - try: - # Actually ignore output on success, but capture stderr on failure - subprocess.check_output(cmd, stderr=subprocess.STDOUT) - except subprocess.CalledProcessError as ex: - raise RuntimeError('scp returned exit status %d:\n%s' - % (ex.returncode, ex.output.strip())) + self._sshclient.scp_put(src, dst) + RapidLog.info("Copying from {} to {}:{}".format(src, self._ip, dst)) def scp_get(self, src, dst): - """Copy src file from remote system to dst on local system.""" - cmd = [ 'scp', - '-B', - '-oStrictHostKeyChecking=no', - '-oUserKnownHostsFile=/dev/null', - '-oLogLevel=ERROR' ] - if self._key is not None: - cmd.extend(['-i', self._key]) - remote = '' - if self._user is not None: - remote += self._user + '@' - remote += self._ip + ':/home/' + self._user + src - cmd.append(remote) - cmd.append(dst) - try: - # Actually ignore output on success, but capture stderr on failure - subprocess.check_output(cmd, stderr=subprocess.STDOUT) - except subprocess.CalledProcessError as ex: - raise RuntimeError('scp returned exit status %d:\n%s' - % (ex.returncode, ex.output.strip())) - - def _build_ssh(self, command): - cmd = [ 'ssh', - '-oBatchMode=yes', - '-oStrictHostKeyChecking=no', - '-oUserKnownHostsFile=/dev/null', - '-oLogLevel=ERROR' ] - if self._key is not None: - cmd.extend(['-i', self._key]) - remote = '' - if self._user is not None: - remote += self._user + '@' - remote += self._ip - cmd.append(remote) - cmd.append(command) - return cmd + self._sshclient.scp_get('/home/' + self._user + src, dst) + RapidLog.info("Copying from {}:/home/{}{} to {}".format(self._ip, + self._user, src, dst)) class prox_sock(object): def __init__(self, sock): @@ -203,10 +137,14 @@ class prox_sock(object): self._send('reset stats') def lat_stats(self, cores, tasks=[0]): - min_lat = 999999999 - max_lat = avg_lat = 0 + result = {} + result['lat_min'] = 999999999 + result['lat_max'] = result['lat_avg'] = 0 + result['buckets'] = [0] * 128 + result['mis_ordered'] = 0 + result['extent'] = 0 + result['duplicate'] = 0 number_tasks_returning_stats = 0 - buckets = [0] * 128 self._send('lat all stats %s %s' % (','.join(map(str, cores)), ','.join(map(str, tasks)))) for core in cores: @@ -219,37 +157,42 @@ class prox_sock(object): (potential incompatibility between scripts and PROX)") raise Exception("lat stats error") number_tasks_returning_stats += 1 - min_lat = min(int(stats[0]),min_lat) - max_lat = max(int(stats[1]),max_lat) - avg_lat += int(stats[2]) + result['lat_min'] = min(int(stats[0]),result['lat_min']) + result['lat_max'] = max(int(stats[1]),result['lat_max']) + result['lat_avg'] += int(stats[2]) #min_since begin = int(stats[3]) #max_since_begin = int(stats[4]) - tsc = int(stats[5]) # Taking the last tsc as the timestamp since - # PROX will return the same tsc for each - # core/task combination - hz = int(stats[6]) + result['lat_tsc'] = int(stats[5]) + # Taking the last tsc as the timestamp since + # PROX will return the same tsc for each + # core/task combination + result['lat_hz'] = int(stats[6]) #coreid = int(stats[7]) #taskid = int(stats[8]) + result['mis_ordered'] += int(stats[9]) + result['extent'] += int(stats[10]) + result['duplicate'] += int(stats[11]) stats = self._recv().split(':') if stats[0].startswith('error'): RapidLog.critical("lat stats error: unexpected lat bucket \ reply (potential incompatibility between scripts \ and PROX)") raise Exception("lat bucket reply error") - buckets[0] = int(stats[1]) + result['buckets'][0] = int(stats[1]) for i in range(1, 128): stats = self._recv().split(':') - buckets[i] = int(stats[1]) - avg_lat = old_div(avg_lat,number_tasks_returning_stats) + result['buckets'][i] += int(stats[1]) + result['lat_avg'] = old_div(result['lat_avg'], + number_tasks_returning_stats) self._send('stats latency(0).used') used = float(self._recv()) self._send('stats latency(0).total') total = float(self._recv()) - return (min_lat, max_lat, avg_lat, (old_div(used,total)), tsc, hz, - buckets) + result['lat_used'] = old_div(used,total) + return (result) def irq_stats(self, core, bucket, task=0): - self._send('stats task.core(%s).task(%s).irq(%s)' % + self._send('stats task.core(%s).task(%s).irq(%s)' % (core, task, bucket)) stats = self._recv().split(',') return int(stats[0]) @@ -263,12 +206,12 @@ class prox_sock(object): def core_stats(self, cores, tasks=[0]): rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0 - self._send('dp core stats %s %s' % (','.join(map(str, cores)), + self._send('dp core stats %s %s' % (','.join(map(str, cores)), ','.join(map(str, tasks)))) for core in cores: for task in tasks: stats = self._recv().split(',') - if stats[0].startswith('error'): + if stats[0].startswith('error'): if stats[0].startswith('error: invalid syntax'): RapidLog.critical("dp core stats error: unexpected \ invalid syntax (potential incompatibility \ @@ -289,7 +232,7 @@ class prox_sock(object): rx = tx = port_id = tsc = no_mbufs = errors = 0 self._send('multi port stats %s' % (','.join(map(str, ports)))) result = self._recv().split(';') - if result[0].startswith('error'): + if result[0].startswith('error'): RapidLog.critical("multi port stats error: unexpected invalid \ syntax (potential incompatibility between scripts and \ PROX)") @@ -305,19 +248,19 @@ class prox_sock(object): return rx, tx, no_mbufs, errors, tsc def set_random(self, cores, task, offset, mask, length): - self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)), + self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)), task, offset, mask, length)) def set_size(self, cores, task, pkt_size): - self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task, + self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task, pkt_size)) def set_imix(self, cores, task, imix): - self._send('imix %s %s %s' % (','.join(map(str, cores)), task, + self._send('imix %s %s %s' % (','.join(map(str, cores)), task, ','.join(map(str,imix)))) def set_value(self, cores, task, offset, value, length): - self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)), + self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)), task, offset, value, length)) def quit_prox(self): @@ -327,16 +270,24 @@ class prox_sock(object): """Append LF and send command to the PROX instance.""" if self._sock is None: raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd) - self._sock.sendall(cmd.encode() + b'\n') + try: + self._sock.sendall(cmd.encode() + b'\n') + except ConnectionResetError as e: + RapidLog.error('Pipe reset by Prox instance: traffic too high?') + raise def _recv(self): """Receive response from PROX instance, return it with LF removed.""" if self._sock is None: raise RuntimeError("PROX socket closed, cannot receive anymore") - pos = self._rcvd.find(b'\n') - while pos == -1: - self._rcvd += self._sock.recv(256) + try: pos = self._rcvd.find(b'\n') - rsp = self._rcvd[:pos] - self._rcvd = self._rcvd[pos+1:] + while pos == -1: + self._rcvd += self._sock.recv(256) + pos = self._rcvd.find(b'\n') + rsp = self._rcvd[:pos] + self._rcvd = self._rcvd[pos+1:] + except ConnectionResetError as e: + RapidLog.error('Pipe reset by Prox instance: traffic too high?') + raise return rsp.decode() |