summaryrefslogtreecommitdiffstats
path: root/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py
diff options
context:
space:
mode:
Diffstat (limited to 'VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py')
-rw-r--r--VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py293
1 files changed, 293 insertions, 0 deletions
diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py
new file mode 100644
index 00000000..8754ebc4
--- /dev/null
+++ b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py
@@ -0,0 +1,293 @@
+##
+## Copyright (c) 2010-2020 Intel Corporation
+##
+## Licensed under the Apache License, Version 2.0 (the "License");
+## you may not use this file except in compliance with the License.
+## You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+##
+
+from __future__ import print_function
+from __future__ import division
+
+from builtins import map
+from builtins import range
+from past.utils import old_div
+from builtins import object
+import os
+import time
+import subprocess
+import socket
+from rapid_log import RapidLog
+from rapid_sshclient import SSHClient
+
+class prox_ctrl(object):
+ def __init__(self, ip, key=None, user=None, password = None):
+ self._ip = ip
+ self._key = key
+ self._user = user
+ self._password = password
+ self._proxsock = []
+ self._sshclient = SSHClient(ip = ip, user = user, password = password,
+ rsa_private_key = key, timeout = None)
+
+ def ip(self):
+ return self._ip
+
+ def test_connection(self):
+ attempts = 1
+ RapidLog.debug("Trying to connect to machine \
+ on %s, attempt: %d" % (self._ip, attempts))
+ while True:
+ try:
+ if (self.run_cmd('test -e /opt/rapid/system_ready_for_rapid \
+ && echo exists')):
+ break
+ time.sleep(2)
+ except RuntimeWarning as ex:
+ RapidLog.debug("RuntimeWarning %d:\n%s"
+ % (ex.returncode, ex.output.strip()))
+ attempts += 1
+ if attempts > 20:
+ RapidLog.exception("Failed to connect to instance after %d\
+ attempts:\n%s" % (attempts, ex))
+ time.sleep(2)
+ RapidLog.debug("Trying to connect to machine \
+ on %s, attempt: %d" % (self._ip, attempts))
+ RapidLog.debug("Connected to machine on %s" % self._ip)
+
+ def connect_socket(self):
+ attempts = 1
+ RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
+ attempt: %d" % (self._ip, attempts))
+ sock = None
+ while True:
+ sock = self.prox_sock()
+ if sock is not None:
+ break
+ attempts += 1
+ if attempts > 20:
+ RapidLog.exception("Failed to connect to PROX on %s after %d \
+ attempts" % (self._ip, attempts))
+ time.sleep(2)
+ RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
+ attempt: %d" % (self._ip, attempts))
+ RapidLog.info("Connected to PROX on %s" % self._ip)
+ return sock
+
+ def close(self):
+ for sock in self._proxsock:
+ sock.quit()
+
+ def run_cmd(self, command):
+ self._sshclient.run_cmd(command)
+ return self._sshclient.get_output()
+
+ def prox_sock(self, port=8474):
+ """Connect to the PROX instance on remote system.
+ Return a prox_sock object on success, None on failure.
+ """
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ sock.connect((self._ip, port))
+ prox = prox_sock(sock)
+ self._proxsock.append(prox)
+ return prox
+ except:
+ return None
+
+ def scp_put(self, src, dst):
+ self._sshclient.scp_put(src, dst)
+ RapidLog.info("Copying from {} to {}:{}".format(src, self._ip, dst))
+
+ def scp_get(self, src, dst):
+ self._sshclient.scp_get('/home/' + self._user + src, dst)
+ RapidLog.info("Copying from {}:/home/{}{} to {}".format(self._ip,
+ self._user, src, dst))
+
+class prox_sock(object):
+ def __init__(self, sock):
+ self._sock = sock
+ self._rcvd = b''
+
+ def __del__(self):
+ if self._sock is not None:
+ self._sock.close()
+ self._sock = None
+
+ def start(self, cores):
+ self._send('start %s' % ','.join(map(str, cores)))
+
+ def stop(self, cores):
+ self._send('stop %s' % ','.join(map(str, cores)))
+
+ def speed(self, speed, cores, tasks=[0]):
+ for core in cores:
+ for task in tasks:
+ self._send('speed %s %s %s' % (core, task, speed))
+
+ def reset_stats(self):
+ self._send('reset stats')
+
+ def lat_stats(self, cores, tasks=[0]):
+ result = {}
+ result['lat_min'] = 999999999
+ result['lat_max'] = result['lat_avg'] = 0
+ result['buckets'] = [0] * 128
+ result['mis_ordered'] = 0
+ result['extent'] = 0
+ result['duplicate'] = 0
+ number_tasks_returning_stats = 0
+ self._send('lat all stats %s %s' % (','.join(map(str, cores)),
+ ','.join(map(str, tasks))))
+ for core in cores:
+ for task in tasks:
+ stats = self._recv().split(',')
+ if 'is not measuring' in stats[0]:
+ continue
+ if stats[0].startswith('error'):
+ RapidLog.critical("lat stats error: unexpected reply from PROX\
+ (potential incompatibility between scripts and PROX)")
+ raise Exception("lat stats error")
+ number_tasks_returning_stats += 1
+ result['lat_min'] = min(int(stats[0]),result['lat_min'])
+ result['lat_max'] = max(int(stats[1]),result['lat_max'])
+ result['lat_avg'] += int(stats[2])
+ #min_since begin = int(stats[3])
+ #max_since_begin = int(stats[4])
+ result['lat_tsc'] = int(stats[5])
+ # Taking the last tsc as the timestamp since
+ # PROX will return the same tsc for each
+ # core/task combination
+ result['lat_hz'] = int(stats[6])
+ #coreid = int(stats[7])
+ #taskid = int(stats[8])
+ result['mis_ordered'] += int(stats[9])
+ result['extent'] += int(stats[10])
+ result['duplicate'] += int(stats[11])
+ stats = self._recv().split(':')
+ if stats[0].startswith('error'):
+ RapidLog.critical("lat stats error: unexpected lat bucket \
+ reply (potential incompatibility between scripts \
+ and PROX)")
+ raise Exception("lat bucket reply error")
+ result['buckets'][0] = int(stats[1])
+ for i in range(1, 128):
+ stats = self._recv().split(':')
+ result['buckets'][i] += int(stats[1])
+ result['lat_avg'] = old_div(result['lat_avg'],
+ number_tasks_returning_stats)
+ self._send('stats latency(0).used')
+ used = float(self._recv())
+ self._send('stats latency(0).total')
+ total = float(self._recv())
+ result['lat_used'] = old_div(used,total)
+ return (result)
+
+ def irq_stats(self, core, bucket, task=0):
+ self._send('stats task.core(%s).task(%s).irq(%s)' %
+ (core, task, bucket))
+ stats = self._recv().split(',')
+ return int(stats[0])
+
+ def show_irq_buckets(self, core, task=0):
+ rx = tx = drop = tsc = hz = 0
+ self._send('show irq buckets %s %s' % (core,task))
+ buckets = self._recv().split(';')
+ buckets = buckets[:-1]
+ return buckets
+
+ def core_stats(self, cores, tasks=[0]):
+ rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0
+ self._send('dp core stats %s %s' % (','.join(map(str, cores)),
+ ','.join(map(str, tasks))))
+ for core in cores:
+ for task in tasks:
+ stats = self._recv().split(',')
+ if stats[0].startswith('error'):
+ if stats[0].startswith('error: invalid syntax'):
+ RapidLog.critical("dp core stats error: unexpected \
+ invalid syntax (potential incompatibility \
+ between scripts and PROX)")
+ raise Exception("dp core stats error")
+ continue
+ rx += int(stats[0])
+ tx += int(stats[1])
+ rx_non_dp += int(stats[2])
+ tx_non_dp += int(stats[3])
+ drop += int(stats[4])
+ tx_fail += int(stats[5])
+ tsc = int(stats[6])
+ hz = int(stats[7])
+ return rx, rx_non_dp, tx, tx_non_dp, drop, tx_fail, tsc, hz
+
+ def multi_port_stats(self, ports=[0]):
+ rx = tx = port_id = tsc = no_mbufs = errors = 0
+ self._send('multi port stats %s' % (','.join(map(str, ports))))
+ result = self._recv().split(';')
+ if result[0].startswith('error'):
+ RapidLog.critical("multi port stats error: unexpected invalid \
+ syntax (potential incompatibility between scripts and \
+ PROX)")
+ raise Exception("multi port stats error")
+ for statistics in result:
+ stats = statistics.split(',')
+ port_id = int(stats[0])
+ rx += int(stats[1])
+ tx += int(stats[2])
+ no_mbufs += int(stats[3])
+ errors += int(stats[4])
+ tsc = int(stats[5])
+ return rx, tx, no_mbufs, errors, tsc
+
+ def set_random(self, cores, task, offset, mask, length):
+ self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)),
+ task, offset, mask, length))
+
+ def set_size(self, cores, task, pkt_size):
+ self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task,
+ pkt_size))
+
+ def set_imix(self, cores, task, imix):
+ self._send('imix %s %s %s' % (','.join(map(str, cores)), task,
+ ','.join(map(str,imix))))
+
+ def set_value(self, cores, task, offset, value, length):
+ self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)),
+ task, offset, value, length))
+
+ def quit_prox(self):
+ self._send('quit')
+
+ def _send(self, cmd):
+ """Append LF and send command to the PROX instance."""
+ if self._sock is None:
+ raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd)
+ try:
+ self._sock.sendall(cmd.encode() + b'\n')
+ except ConnectionResetError as e:
+ RapidLog.error('Pipe reset by Prox instance: traffic too high?')
+ raise
+
+ def _recv(self):
+ """Receive response from PROX instance, return it with LF removed."""
+ if self._sock is None:
+ raise RuntimeError("PROX socket closed, cannot receive anymore")
+ try:
+ pos = self._rcvd.find(b'\n')
+ while pos == -1:
+ self._rcvd += self._sock.recv(256)
+ pos = self._rcvd.find(b'\n')
+ rsp = self._rcvd[:pos]
+ self._rcvd = self._rcvd[pos+1:]
+ except ConnectionResetError as e:
+ RapidLog.error('Pipe reset by Prox instance: traffic too high?')
+ raise
+ return rsp.decode()