aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'yardstick/network_services/vnf_generic/vnf/prox_helpers.py')
-rw-r--r--yardstick/network_services/vnf_generic/vnf/prox_helpers.py351
1 files changed, 299 insertions, 52 deletions
diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
index 321c05779..3507315f2 100644
--- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
+++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2017 Intel Corporation
+# Copyright (c) 2018-2019 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ import re
import select
import socket
import time
+
from collections import OrderedDict, namedtuple
from contextlib import contextmanager
from itertools import repeat, chain
@@ -325,7 +326,28 @@ class ProxSocketHelper(object):
return ret_str, False
- def get_data(self, pkt_dump_only=False, timeout=0.01):
+ def get_string(self, pkt_dump_only=False, timeout=0.01):
+
+ def is_ready_string():
+ # recv() is blocking, so avoid calling it when no data is waiting.
+ ready = select.select([self._sock], [], [], timeout)
+ return bool(ready[0])
+
+ status = False
+ ret_str = ""
+ while status is False:
+ for status in iter(is_ready_string, False):
+ decoded_data = self._sock.recv(256).decode('utf-8')
+ ret_str, done = self._parse_socket_data(decoded_data,
+ pkt_dump_only)
+ if (done):
+ status = True
+ break
+
+ LOG.debug("Received data from socket: [%s]", ret_str)
+ return status, ret_str
+
+ def get_data(self, pkt_dump_only=False, timeout=10.0):
""" read data from the socket """
# This method behaves slightly differently depending on whether it is
@@ -394,7 +416,6 @@ class ProxSocketHelper(object):
""" stop all cores on the remote instance """
LOG.debug("Stop all")
self.put_command("stop all\n")
- time.sleep(3)
def stop(self, cores, task=''):
""" stop specific cores on the remote instance """
@@ -406,7 +427,6 @@ class ProxSocketHelper(object):
LOG.debug("Stopping cores %s", tmpcores)
self.put_command("stop {} {}\n".format(join_non_strings(',', tmpcores), task))
- time.sleep(3)
def start_all(self):
""" start all cores on the remote instance """
@@ -423,13 +443,11 @@ class ProxSocketHelper(object):
LOG.debug("Starting cores %s", tmpcores)
self.put_command("start {}\n".format(join_non_strings(',', tmpcores)))
- time.sleep(3)
def reset_stats(self):
""" reset the statistics on the remote instance """
LOG.debug("Reset stats")
self.put_command("reset stats\n")
- time.sleep(1)
def _run_template_over_cores(self, template, cores, *args):
for core in cores:
@@ -440,7 +458,6 @@ class ProxSocketHelper(object):
LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
pkt_size -= 4
self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
- time.sleep(1)
def set_value(self, cores, offset, value, length):
""" set value on the remote instance """
@@ -544,50 +561,173 @@ class ProxSocketHelper(object):
tsc = int(ret[3])
return rx, tx, drop, tsc
- def multi_port_stats(self, ports):
- """get counter values from all ports port"""
+ def irq_core_stats(self, cores_tasks):
+ """ get IRQ stats per core"""
+
+ stat = {}
+ core = 0
+ task = 0
+ for core, task in cores_tasks:
+ self.put_command("stats task.core({}).task({}).max_irq,task.core({}).task({}).irq(0),"
+ "task.core({}).task({}).irq(1),task.core({}).task({}).irq(2),"
+ "task.core({}).task({}).irq(3),task.core({}).task({}).irq(4),"
+ "task.core({}).task({}).irq(5),task.core({}).task({}).irq(6),"
+ "task.core({}).task({}).irq(7),task.core({}).task({}).irq(8),"
+ "task.core({}).task({}).irq(9),task.core({}).task({}).irq(10),"
+ "task.core({}).task({}).irq(11),task.core({}).task({}).irq(12)"
+ "\n".format(core, task, core, task, core, task, core, task,
+ core, task, core, task, core, task, core, task,
+ core, task, core, task, core, task, core, task,
+ core, task, core, task))
+ in_data_str = self.get_data().split(",")
+ ret = [try_int(s, 0) for s in in_data_str]
+ key = "core_" + str(core)
+ try:
+ stat[key] = {"cpu": core, "max_irq": ret[0], "bucket_0" : ret[1],
+ "bucket_1" : ret[2], "bucket_2" : ret[3],
+ "bucket_3" : ret[4], "bucket_4" : ret[5],
+ "bucket_5" : ret[6], "bucket_6" : ret[7],
+ "bucket_7" : ret[8], "bucket_8" : ret[9],
+ "bucket_9" : ret[10], "bucket_10" : ret[11],
+ "bucket_11" : ret[12], "bucket_12" : ret[13],
+ "overflow": ret[10] + ret[11] + ret[12] + ret[13]}
+ except (KeyError, IndexError):
+ LOG.error("Corrupted PACKET %s", in_data_str)
+
+ return stat
- ports_str = ""
- for port in ports:
- ports_str = ports_str + str(port) + ","
- ports_str = ports_str[:-1]
+ def multi_port_stats(self, ports):
+ """get counter values from all ports at once"""
+ ports_str = ",".join(map(str, ports))
ports_all_data = []
tot_result = [0] * len(ports)
- retry_counter = 0
port_index = 0
- while (len(ports) is not len(ports_all_data)) and (retry_counter < 10):
+ while (len(ports) is not len(ports_all_data)):
self.put_command("multi port stats {}\n".format(ports_str))
- ports_all_data = self.get_data().split(";")
+ status, ports_all_data_str = self.get_string()
+
+ if not status:
+ return False, []
+
+ ports_all_data = ports_all_data_str.split(";")
if len(ports) is len(ports_all_data):
for port_data_str in ports_all_data:
+ tmpdata = []
try:
- tot_result[port_index] = [try_int(s, 0) for s in port_data_str.split(",")]
+ tmpdata = [try_int(s, 0) for s in port_data_str.split(",")]
except (IndexError, TypeError):
- LOG.error("Port Index error %d %s - retrying ", port_index, port_data_str)
-
- if (len(tot_result[port_index]) is not 6) or \
- tot_result[port_index][0] is not ports[port_index]:
- ports_all_data = []
- tot_result = [0] * len(ports)
- port_index = 0
- time.sleep(0.1)
+ LOG.error("Unpacking data error %s", port_data_str)
+ return False, []
+
+ if (len(tmpdata) < 6) or tmpdata[0] not in ports:
LOG.error("Corrupted PACKET %s - retrying", port_data_str)
- break
+ return False, []
else:
+ tot_result[port_index] = tmpdata
port_index = port_index + 1
else:
LOG.error("Empty / too much data - retry -%s-", ports_all_data)
- ports_all_data = []
- tot_result = [0] * len(ports)
- port_index = 0
- time.sleep(0.1)
+ return False, []
- retry_counter = retry_counter + 1
- return tot_result
+ LOG.debug("Multi port packet ..OK.. %s", tot_result)
+ return True, tot_result
+
+ @staticmethod
+ def multi_port_stats_tuple(stats, ports):
+ """
+ Create a statistics tuple from port stats.
+
+ Returns a dict with contains the port stats indexed by port name
+
+ :param stats: (List) - List of List of port stats in pps
+ :param ports (Iterator) - to List of Ports
+
+ :return: (Dict) of port stats indexed by port_name
+ """
+
+ samples = {}
+ port_names = {}
+ try:
+ port_names = {port_num: port_name for port_name, port_num in ports}
+ except (TypeError, IndexError, KeyError):
+ LOG.critical("Ports are not initialized or number of port is ZERO ... CRITICAL ERROR")
+ return {}
+
+ try:
+ for stat in stats:
+ port_num = stat[0]
+ samples[port_names[port_num]] = {
+ "in_packets": stat[1],
+ "out_packets": stat[2]}
+ except (TypeError, IndexError, KeyError):
+ LOG.error("Ports data and samples data is incompatable ....")
+ return {}
+
+ return samples
+
+ @staticmethod
+ def multi_port_stats_diff(prev_stats, new_stats, hz):
+ """
+ Create a statistics tuple from difference between prev port stats
+ and current port stats. And store results in pps.
+
+ :param prev_stats: (List) - Previous List of port statistics
+ :param new_stats: (List) - Current List of port statistics
+ :param hz (float) - speed of system in Hz
+
+ :return: sample (List) - Difference of prev_port_stats and
+ new_port_stats in pps
+ """
+
+ RX_TOTAL_INDEX = 1
+ TX_TOTAL_INDEX = 2
+ TSC_INDEX = 5
+
+ stats = []
+
+ if len(prev_stats) is not len(new_stats):
+ for port_index, stat in enumerate(new_stats):
+ stats.append([port_index, float(0), float(0), 0, 0, 0])
+ return stats
+
+ try:
+ for port_index, stat in enumerate(new_stats):
+ if stat[RX_TOTAL_INDEX] > prev_stats[port_index][RX_TOTAL_INDEX]:
+ rx_total = stat[RX_TOTAL_INDEX] - \
+ prev_stats[port_index][RX_TOTAL_INDEX]
+ else:
+ rx_total = stat[RX_TOTAL_INDEX]
+
+ if stat[TX_TOTAL_INDEX] > prev_stats[port_index][TX_TOTAL_INDEX]:
+ tx_total = stat[TX_TOTAL_INDEX] - prev_stats[port_index][TX_TOTAL_INDEX]
+ else:
+ tx_total = stat[TX_TOTAL_INDEX]
+
+ if stat[TSC_INDEX] > prev_stats[port_index][TSC_INDEX]:
+ tsc = stat[TSC_INDEX] - prev_stats[port_index][TSC_INDEX]
+ else:
+ tsc = stat[TSC_INDEX]
+
+ if tsc is 0:
+ rx_total = tx_total = float(0)
+ else:
+ if hz is 0:
+ LOG.error("HZ is ZERO ..")
+ rx_total = tx_total = float(0)
+ else:
+ rx_total = float(rx_total * hz / tsc)
+ tx_total = float(tx_total * hz / tsc)
+
+ stats.append([port_index, rx_total, tx_total, 0, 0, tsc])
+ except (TypeError, IndexError, KeyError):
+ stats = []
+ LOG.info("Current Port Stats incompatable to previous Port stats .. Discarded")
+
+ return stats
def port_stats(self, ports):
"""get counter values from a specific port"""
@@ -649,7 +789,6 @@ class ProxSocketHelper(object):
self.put_command("quit_force\n")
time.sleep(3)
-
_LOCAL_OBJECT = object()
@@ -731,6 +870,30 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
file_str[1] = self.additional_files[base_name]
return '"'.join(file_str)
+ def _make_core_list(self, inputStr):
+
+ my_input = inputStr.split("core ", 1)[1]
+ ok_list = set()
+
+ substrs = [x.strip() for x in my_input.split(',')]
+ for i in substrs:
+ try:
+ ok_list.add(int(i))
+
+ except ValueError:
+ try:
+ substr = [int(k.strip()) for k in i.split('-')]
+ if len(substr) > 1:
+ startstr = substr[0]
+ endstr = substr[len(substr) - 1]
+ for z in range(startstr, endstr + 1):
+ ok_list.add(z)
+ except ValueError:
+ LOG.error("Error in cores list ... resuming ")
+ return ok_list
+
+ return ok_list
+
def generate_prox_config_file(self, config_path):
sections = []
prox_config = ConfigParser(config_path, sections)
@@ -750,6 +913,18 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
if section_data[0] == "mac":
section_data[1] = "hardware"
+ # adjust for range of cores
+ new_sections = []
+ for section_name, section in sections:
+ if section_name.startswith('core') and section_name.find('$') == -1:
+ core_list = self._make_core_list(section_name)
+ for core in core_list:
+ new_sections.append(["core " + str(core), section])
+ else:
+ new_sections.append([section_name, section])
+
+ sections = new_sections
+
# search for dst mac
for _, section in sections:
for section_data in section:
@@ -956,6 +1131,8 @@ class ProxResourceHelper(ClientResourceHelper):
self.step_delta = 1
self.step_time = 0.5
self._test_type = None
+ self.prev_multi_port = []
+ self.prev_hz = 0
@property
def sut(self):
@@ -969,7 +1146,7 @@ class ProxResourceHelper(ClientResourceHelper):
self._test_type = self.setup_helper.find_in_section('global', 'name', None)
return self._test_type
- def run_traffic(self, traffic_profile, *args):
+ def run_traffic(self, traffic_profile):
self._queue.cancel_join_thread()
self.lower = 0.0
self.upper = 100.0
@@ -994,11 +1171,40 @@ class ProxResourceHelper(ClientResourceHelper):
def collect_collectd_kpi(self):
return self._collect_resource_kpi()
+ def collect_live_stats(self):
+ ports = []
+ for _, port_num in self.vnfd_helper.ports_iter():
+ ports.append(port_num)
+
+ ok, curr_port_stats = self.sut.multi_port_stats(ports)
+ if not ok:
+ return False, {}
+
+ hz = self.sut.hz()
+ if hz is 0:
+ hz = self.prev_hz
+ else:
+ self.prev_hz = hz
+
+ new_all_port_stats = \
+ self.sut.multi_port_stats_diff(self.prev_multi_port, curr_port_stats, hz)
+
+ self.prev_multi_port = curr_port_stats
+
+ live_stats = self.sut.multi_port_stats_tuple(new_all_port_stats,
+ self.vnfd_helper.ports_iter())
+ return True, live_stats
+
def collect_kpi(self):
result = super(ProxResourceHelper, self).collect_kpi()
# add in collectd kpis manually
if result:
result['collect_stats'] = self._collect_resource_kpi()
+
+ ok, live_stats = self.collect_live_stats()
+ if ok:
+ result.update({'live_stats': live_stats})
+
return result
def terminate(self):
@@ -1070,41 +1276,70 @@ class ProxDataHelper(object):
def totals_and_pps(self):
if self._totals_and_pps is None:
rx_total = tx_total = 0
- all_ports = self.sut.multi_port_stats(range(self.port_count))
- for port in all_ports:
- rx_total = rx_total + port[1]
- tx_total = tx_total + port[2]
- requested_pps = self.value / 100.0 * self.line_rate_to_pps()
- self._totals_and_pps = rx_total, tx_total, requested_pps
+ ok = False
+ timeout = time.time() + constants.RETRY_TIMEOUT
+ while not ok:
+ ok, all_ports = self.sut.multi_port_stats([
+ self.vnfd_helper.port_num(port_name)
+ for port_name in self.vnfd_helper.port_pairs.all_ports])
+ if time.time() > timeout:
+ break
+ if ok:
+ for port in all_ports:
+ rx_total = rx_total + port[1]
+ tx_total = tx_total + port[2]
+ requested_pps = self.value / 100.0 * self.line_rate_to_pps()
+ self._totals_and_pps = rx_total, tx_total, requested_pps
return self._totals_and_pps
@property
def rx_total(self):
- return self.totals_and_pps[0]
+ try:
+ ret_val = self.totals_and_pps[0]
+ except (AttributeError, ValueError, TypeError, LookupError):
+ ret_val = 0
+ return ret_val
@property
def tx_total(self):
- return self.totals_and_pps[1]
+ try:
+ ret_val = self.totals_and_pps[1]
+ except (AttributeError, ValueError, TypeError, LookupError):
+ ret_val = 0
+ return ret_val
@property
def requested_pps(self):
- return self.totals_and_pps[2]
+ try:
+ ret_val = self.totals_and_pps[2]
+ except (AttributeError, ValueError, TypeError, LookupError):
+ ret_val = 0
+ return ret_val
@property
def samples(self):
samples = {}
ports = []
- port_names = []
+ port_names = {}
for port_name, port_num in self.vnfd_helper.ports_iter():
ports.append(port_num)
- port_names.append(port_name)
-
- results = self.sut.multi_port_stats(ports)
- for result in results:
- port_num = result[0]
- samples[port_names[port_num]] = {
- "in_packets": result[1],
- "out_packets": result[2]}
+ port_names[port_num] = port_name
+
+ ok = False
+ timeout = time.time() + constants.RETRY_TIMEOUT
+ while not ok:
+ ok, results = self.sut.multi_port_stats(ports)
+ if time.time() > timeout:
+ break
+ if ok:
+ for result in results:
+ port_num = result[0]
+ try:
+ samples[port_names[port_num]] = {
+ "in_packets": result[1],
+ "out_packets": result[2]}
+ except (IndexError, KeyError):
+ pass
return samples
def __enter__(self):
@@ -1902,3 +2137,15 @@ class ProxlwAFTRProfileHelper(ProxProfileHelper):
data_helper.latency = self.get_latency()
return data_helper.result_tuple, data_helper.samples
+
+
+class ProxIrqProfileHelper(ProxProfileHelper):
+
+ __prox_profile_type__ = "IRQ Query"
+
+ def __init__(self, resource_helper):
+ super(ProxIrqProfileHelper, self).__init__(resource_helper)
+ self._cores_tuple = None
+ self._ports_tuple = None
+ self.step_delta = 5
+ self.step_time = 0.5