diff options
Diffstat (limited to 'VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py')
-rw-r--r-- | VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py | 392 |
1 files changed, 178 insertions, 214 deletions
diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py index ba21913c..8754ebc4 100644 --- a/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py +++ b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py @@ -1,5 +1,5 @@ ## -## Copyright (c) 2010-2019 Intel Corporation +## Copyright (c) 2010-2020 Intel Corporation ## ## Licensed under the Apache License, Version 2.0 (the "License"); ## you may not use this file except in compliance with the License. @@ -15,94 +15,80 @@ ## from __future__ import print_function +from __future__ import division +from builtins import map +from builtins import range +from past.utils import old_div +from builtins import object import os +import time import subprocess import socket +from rapid_log import RapidLog +from rapid_sshclient import SSHClient class prox_ctrl(object): - def __init__(self, ip, key=None, user=None): + def __init__(self, ip, key=None, user=None, password = None): self._ip = ip self._key = key self._user = user - self._children = [] + self._password = password self._proxsock = [] + self._sshclient = SSHClient(ip = ip, user = user, password = password, + rsa_private_key = key, timeout = None) def ip(self): return self._ip - def connect(self): - """Simply try to run 'true' over ssh on remote system. - On failure, raise RuntimeWarning exception when possibly worth - retrying, and raise RuntimeError exception otherwise. - """ - return self.run_cmd('true', True) + def test_connection(self): + attempts = 1 + RapidLog.debug("Trying to connect to machine \ + on %s, attempt: %d" % (self._ip, attempts)) + while True: + try: + if (self.run_cmd('test -e /opt/rapid/system_ready_for_rapid \ + && echo exists')): + break + time.sleep(2) + except RuntimeWarning as ex: + RapidLog.debug("RuntimeWarning %d:\n%s" + % (ex.returncode, ex.output.strip())) + attempts += 1 + if attempts > 20: + RapidLog.exception("Failed to connect to instance after %d\ + attempts:\n%s" % (attempts, ex)) + time.sleep(2) + RapidLog.debug("Trying to connect to machine \ + on %s, attempt: %d" % (self._ip, attempts)) + RapidLog.debug("Connected to machine on %s" % self._ip) - def close(self): - """Must be called before program termination.""" -# for prox in self._proxsock: -# prox.quit() - children = len(self._children) - if children == 0: - return - if children > 1: - print('Waiting for %d child processes to complete ...' % children) - for child in self._children: - ret = os.waitpid(child[0], os.WNOHANG) - if ret[0] == 0: - print("Waiting for child process '%s' to complete ..." % child[1]) - ret = os.waitpid(child[0], 0) - rc = ret[1] - if os.WIFEXITED(rc): - if os.WEXITSTATUS(rc) == 0: - print("Child process '%s' completed successfully" % child[1]) - else: - print("Child process '%s' returned exit status %d" % ( - child[1], os.WEXITSTATUS(rc))) - elif os.WIFSIGNALED(rc): - print("Child process '%s' exited on signal %d" % ( - child[1], os.WTERMSIG(rc))) - else: - print("Wait status for child process '%s' is 0x%04x" % ( - child[1], rc)) + def connect_socket(self): + attempts = 1 + RapidLog.debug("Trying to connect to PROX (just launched) on %s, \ + attempt: %d" % (self._ip, attempts)) + sock = None + while True: + sock = self.prox_sock() + if sock is not None: + break + attempts += 1 + if attempts > 20: + RapidLog.exception("Failed to connect to PROX on %s after %d \ + attempts" % (self._ip, attempts)) + time.sleep(2) + RapidLog.debug("Trying to connect to PROX (just launched) on %s, \ + attempt: %d" % (self._ip, attempts)) + RapidLog.info("Connected to PROX on %s" % self._ip) + return sock - def run_cmd(self, command, _connect=False): - """Execute command over ssh on remote system. - Wait for remote command completion. - Return command output (combined stdout and stderr). - _connect argument is reserved for connect() method. - """ - cmd = self._build_ssh(command) - try: - return subprocess.check_output(cmd, stderr=subprocess.STDOUT) - except subprocess.CalledProcessError as ex: - if _connect and ex.returncode == 255: - raise RuntimeWarning(ex.output.strip()) - raise RuntimeError('ssh returned exit status %d:\n%s' - % (ex.returncode, ex.output.strip())) + def close(self): + for sock in self._proxsock: + sock.quit() - def fork_cmd(self, command, name=None): - """Execute command over ssh on remote system, in a child process. - Do not wait for remote command completion. - Return child process id. - """ - if name is None: - name = command - cmd = self._build_ssh(command) - pid = os.fork() - if (pid != 0): - # In the parent process - self._children.append((pid, name)) - return pid - # In the child process: use os._exit to terminate - try: - # Actually ignore output on success, but capture stderr on failure - subprocess.check_output(cmd, stderr=subprocess.STDOUT) - except subprocess.CalledProcessError as ex: - raise RuntimeError("Child process '%s' failed:\n" - 'ssh returned exit status %d:\n%s' - % (name, ex.returncode, ex.output.strip())) - os._exit(0) + def run_cmd(self, command): + self._sshclient.run_cmd(command) + return self._sshclient.get_output() def prox_sock(self, port=8474): """Connect to the PROX instance on remote system. @@ -118,51 +104,21 @@ class prox_ctrl(object): return None def scp_put(self, src, dst): - """Copy src file from local system to dst on remote system.""" - cmd = [ 'scp', - '-B', - '-oStrictHostKeyChecking=no', - '-oUserKnownHostsFile=/dev/null', - '-oLogLevel=ERROR' ] - if self._key is not None: - cmd.extend(['-i', self._key]) - cmd.append(src) - remote = '' - if self._user is not None: - remote += self._user + '@' - remote += self._ip + ':' + dst - cmd.append(remote) - try: - # Actually ignore output on success, but capture stderr on failure - subprocess.check_output(cmd, stderr=subprocess.STDOUT) - except subprocess.CalledProcessError as ex: - raise RuntimeError('scp returned exit status %d:\n%s' - % (ex.returncode, ex.output.strip())) + self._sshclient.scp_put(src, dst) + RapidLog.info("Copying from {} to {}:{}".format(src, self._ip, dst)) - def _build_ssh(self, command): - cmd = [ 'ssh', - '-oBatchMode=yes', - '-oStrictHostKeyChecking=no', - '-oUserKnownHostsFile=/dev/null', - '-oLogLevel=ERROR' ] - if self._key is not None: - cmd.extend(['-i', self._key]) - remote = '' - if self._user is not None: - remote += self._user + '@' - remote += self._ip - cmd.append(remote) - cmd.append(command) - return cmd + def scp_get(self, src, dst): + self._sshclient.scp_get('/home/' + self._user + src, dst) + RapidLog.info("Copying from {}:/home/{}{} to {}".format(self._ip, + self._user, src, dst)) class prox_sock(object): def __init__(self, sock): self._sock = sock self._rcvd = b'' - def quit(self): + def __del__(self): if self._sock is not None: - self._send('quit') self._sock.close() self._sock = None @@ -174,84 +130,70 @@ class prox_sock(object): def speed(self, speed, cores, tasks=[0]): for core in cores: - for task in tasks: - self._send('speed %s %s %s' % (core, task, speed)) + for task in tasks: + self._send('speed %s %s %s' % (core, task, speed)) def reset_stats(self): self._send('reset stats') def lat_stats(self, cores, tasks=[0]): - min_lat = 999999999 - max_lat = avg_lat = 0 - number_tasks_returning_stats = 0 - buckets = [0] * 128 - self._send('lat all stats %s %s' % (','.join(map(str, cores)), ','.join(map(str, tasks)))) + result = {} + result['lat_min'] = 999999999 + result['lat_max'] = result['lat_avg'] = 0 + result['buckets'] = [0] * 128 + result['mis_ordered'] = 0 + result['extent'] = 0 + result['duplicate'] = 0 + number_tasks_returning_stats = 0 + self._send('lat all stats %s %s' % (','.join(map(str, cores)), + ','.join(map(str, tasks)))) for core in cores: - for task in tasks: - stats = self._recv().split(',') - if 'is not measuring' in stats[0]: - continue - if stats[0].startswith('error'): - log.critical("lat stats error: unexpected reply from PROX (potential incompatibility between scripts and PROX)") - raise Exception("lat stats error") - number_tasks_returning_stats += 1 - min_lat = min(int(stats[0]),min_lat) - max_lat = max(int(stats[1]),max_lat) - avg_lat += int(stats[2]) - #min_since begin = int(stats[3]) - #max_since_begin = int(stats[4]) - tsc = int(stats[5]) # Taking the last tsc as the timestamp since PROX will return the same tsc for each core/task combination - hz = int(stats[6]) - #coreid = int(stats[7]) - #taskid = int(stats[8]) - stats = self._recv().split(':') - if stats[0].startswith('error'): - log.critical("lat stats error: unexpected lat bucket reply (potential incompatibility between scripts and PROX)") - raise Exception("lat bucket reply error") - buckets[0] = int(stats[1]) - for i in range(1, 128): - stats = self._recv().split(':') - buckets[i] = int(stats[1]) - avg_lat = avg_lat/number_tasks_returning_stats + for task in tasks: + stats = self._recv().split(',') + if 'is not measuring' in stats[0]: + continue + if stats[0].startswith('error'): + RapidLog.critical("lat stats error: unexpected reply from PROX\ + (potential incompatibility between scripts and PROX)") + raise Exception("lat stats error") + number_tasks_returning_stats += 1 + result['lat_min'] = min(int(stats[0]),result['lat_min']) + result['lat_max'] = max(int(stats[1]),result['lat_max']) + result['lat_avg'] += int(stats[2]) + #min_since begin = int(stats[3]) + #max_since_begin = int(stats[4]) + result['lat_tsc'] = int(stats[5]) + # Taking the last tsc as the timestamp since + # PROX will return the same tsc for each + # core/task combination + result['lat_hz'] = int(stats[6]) + #coreid = int(stats[7]) + #taskid = int(stats[8]) + result['mis_ordered'] += int(stats[9]) + result['extent'] += int(stats[10]) + result['duplicate'] += int(stats[11]) + stats = self._recv().split(':') + if stats[0].startswith('error'): + RapidLog.critical("lat stats error: unexpected lat bucket \ + reply (potential incompatibility between scripts \ + and PROX)") + raise Exception("lat bucket reply error") + result['buckets'][0] = int(stats[1]) + for i in range(1, 128): + stats = self._recv().split(':') + result['buckets'][i] += int(stats[1]) + result['lat_avg'] = old_div(result['lat_avg'], + number_tasks_returning_stats) self._send('stats latency(0).used') used = float(self._recv()) self._send('stats latency(0).total') total = float(self._recv()) - return min_lat, max_lat, avg_lat, (used/total), tsc, hz, buckets - - def old_lat_stats(self, cores, tasks=[0]): - min_lat = 999999999 - max_lat = avg_lat = 0 - number_tasks_returning_stats = 0 - buckets = [0] * 128 - self._send('lat stats %s %s' % (','.join(map(str, cores)), ','.join(map(str, tasks)))) - for core in cores: - for task in tasks: - stats = self._recv().split(',') - if stats[0].startswith('error'): - if stats[0].startswith('error: invalid syntax'): - log.critical("lat stats error: unexpected invalid syntax (potential incompatibility between scripts and PROX)") - raise Exception("lat stats error") - continue - number_tasks_returning_stats += 1 - min_lat = min(int(stats[0]),min_lat) - max_lat = max(int(stats[1]),max_lat) - avg_lat += int(stats[2]) - #min_since begin = int(stats[3]) - #max_since_begin = int(stats[4]) - tsc = int(stats[5]) - hz = int(stats[6]) - #coreid = int(stats[7]) - #taskid = int(stats[8]) - avg_lat = avg_lat/number_tasks_returning_stats - self._send('stats latency(0).used') - used = float(self._recv()) - self._send('stats latency(0).total') - total = float(self._recv()) - return min_lat, max_lat, avg_lat, (used/total), tsc, hz, buckets + result['lat_used'] = old_div(used,total) + return (result) def irq_stats(self, core, bucket, task=0): - self._send('stats task.core(%s).task(%s).irq(%s)' % (core, task, bucket)) + self._send('stats task.core(%s).task(%s).irq(%s)' % + (core, task, bucket)) stats = self._recv().split(',') return int(stats[0]) @@ -259,71 +201,93 @@ class prox_sock(object): rx = tx = drop = tsc = hz = 0 self._send('show irq buckets %s %s' % (core,task)) buckets = self._recv().split(';') - buckets = buckets[:-1] + buckets = buckets[:-1] return buckets def core_stats(self, cores, tasks=[0]): rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0 - self._send('dp core stats %s %s' % (','.join(map(str, cores)), ','.join(map(str, tasks)))) + self._send('dp core stats %s %s' % (','.join(map(str, cores)), + ','.join(map(str, tasks)))) for core in cores: - for task in tasks: - stats = self._recv().split(',') - if stats[0].startswith('error'): - if stats[0].startswith('error: invalid syntax'): - log.critical("dp core stats error: unexpected invalid syntax (potential incompatibility between scripts and PROX)") - raise Exception("dp core stats error") - continue - rx += int(stats[0]) - tx += int(stats[1]) - rx_non_dp += int(stats[2]) - tx_non_dp += int(stats[3]) - drop += int(stats[4]) - tx_fail += int(stats[5]) - tsc = int(stats[6]) - hz = int(stats[7]) + for task in tasks: + stats = self._recv().split(',') + if stats[0].startswith('error'): + if stats[0].startswith('error: invalid syntax'): + RapidLog.critical("dp core stats error: unexpected \ + invalid syntax (potential incompatibility \ + between scripts and PROX)") + raise Exception("dp core stats error") + continue + rx += int(stats[0]) + tx += int(stats[1]) + rx_non_dp += int(stats[2]) + tx_non_dp += int(stats[3]) + drop += int(stats[4]) + tx_fail += int(stats[5]) + tsc = int(stats[6]) + hz = int(stats[7]) return rx, rx_non_dp, tx, tx_non_dp, drop, tx_fail, tsc, hz def multi_port_stats(self, ports=[0]): rx = tx = port_id = tsc = no_mbufs = errors = 0 self._send('multi port stats %s' % (','.join(map(str, ports)))) - result = self._recv().split(';') - if result[0].startswith('error'): - log.critical("multi port stats error: unexpected invalid syntax (potential incompatibility between scripts and PROX)") - raise Exception("multi port stats error") + result = self._recv().split(';') + if result[0].startswith('error'): + RapidLog.critical("multi port stats error: unexpected invalid \ + syntax (potential incompatibility between scripts and \ + PROX)") + raise Exception("multi port stats error") for statistics in result: - stats = statistics.split(',') - port_id = int(stats[0]) - rx += int(stats[1]) - tx += int(stats[2]) - no_mbufs += int(stats[3]) - errors += int(stats[4]) - tsc = int(stats[5]) + stats = statistics.split(',') + port_id = int(stats[0]) + rx += int(stats[1]) + tx += int(stats[2]) + no_mbufs += int(stats[3]) + errors += int(stats[4]) + tsc = int(stats[5]) return rx, tx, no_mbufs, errors, tsc def set_random(self, cores, task, offset, mask, length): - self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)), task, offset, mask, length)) + self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)), + task, offset, mask, length)) def set_size(self, cores, task, pkt_size): - self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task, pkt_size)) + self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task, + pkt_size)) + + def set_imix(self, cores, task, imix): + self._send('imix %s %s %s' % (','.join(map(str, cores)), task, + ','.join(map(str,imix)))) def set_value(self, cores, task, offset, value, length): - self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)), task, offset, value, length)) + self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)), + task, offset, value, length)) + + def quit_prox(self): + self._send('quit') def _send(self, cmd): """Append LF and send command to the PROX instance.""" if self._sock is None: raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd) - self._sock.sendall(cmd.encode() + b'\n') + try: + self._sock.sendall(cmd.encode() + b'\n') + except ConnectionResetError as e: + RapidLog.error('Pipe reset by Prox instance: traffic too high?') + raise def _recv(self): - """Receive response from PROX instance, and return it with LF removed.""" + """Receive response from PROX instance, return it with LF removed.""" if self._sock is None: raise RuntimeError("PROX socket closed, cannot receive anymore") - pos = self._rcvd.find(b'\n') - while pos == -1: - self._rcvd += self._sock.recv(256) + try: pos = self._rcvd.find(b'\n') - rsp = self._rcvd[:pos] - self._rcvd = self._rcvd[pos+1:] + while pos == -1: + self._rcvd += self._sock.recv(256) + pos = self._rcvd.find(b'\n') + rsp = self._rcvd[:pos] + self._rcvd = self._rcvd[pos+1:] + except ConnectionResetError as e: + RapidLog.error('Pipe reset by Prox instance: traffic too high?') + raise return rsp.decode() - |