diff options
Diffstat (limited to 'yardstick/network_services/vnf_generic/vnf/prox_helpers.py')
-rw-r--r-- | yardstick/network_services/vnf_generic/vnf/prox_helpers.py | 353 |
1 files changed, 300 insertions, 53 deletions
diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py index 3241719e8..3507315f2 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -1,4 +1,4 @@ -# Copyright (c) 2017 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import re import select import socket import time + from collections import OrderedDict, namedtuple from contextlib import contextmanager from itertools import repeat, chain @@ -325,7 +326,28 @@ class ProxSocketHelper(object): return ret_str, False - def get_data(self, pkt_dump_only=False, timeout=0.01): + def get_string(self, pkt_dump_only=False, timeout=0.01): + + def is_ready_string(): + # recv() is blocking, so avoid calling it when no data is waiting. + ready = select.select([self._sock], [], [], timeout) + return bool(ready[0]) + + status = False + ret_str = "" + while status is False: + for status in iter(is_ready_string, False): + decoded_data = self._sock.recv(256).decode('utf-8') + ret_str, done = self._parse_socket_data(decoded_data, + pkt_dump_only) + if (done): + status = True + break + + LOG.debug("Received data from socket: [%s]", ret_str) + return status, ret_str + + def get_data(self, pkt_dump_only=False, timeout=10.0): """ read data from the socket """ # This method behaves slightly differently depending on whether it is @@ -394,7 +416,6 @@ class ProxSocketHelper(object): """ stop all cores on the remote instance """ LOG.debug("Stop all") self.put_command("stop all\n") - time.sleep(3) def stop(self, cores, task=''): """ stop specific cores on the remote instance """ @@ -406,7 +427,6 @@ class ProxSocketHelper(object): LOG.debug("Stopping cores %s", tmpcores) self.put_command("stop {} {}\n".format(join_non_strings(',', tmpcores), task)) - time.sleep(3) def start_all(self): """ start all cores on the remote instance """ @@ -423,13 +443,11 @@ class ProxSocketHelper(object): LOG.debug("Starting cores %s", tmpcores) self.put_command("start {}\n".format(join_non_strings(',', tmpcores))) - time.sleep(3) def reset_stats(self): """ reset the statistics on the remote instance """ LOG.debug("Reset stats") self.put_command("reset stats\n") - time.sleep(1) def _run_template_over_cores(self, template, cores, *args): for core in cores: @@ -440,7 +458,6 @@ class ProxSocketHelper(object): LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size) pkt_size -= 4 self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size) - time.sleep(1) def set_value(self, cores, offset, value, length): """ set value on the remote instance """ @@ -544,50 +561,173 @@ class ProxSocketHelper(object): tsc = int(ret[3]) return rx, tx, drop, tsc - def multi_port_stats(self, ports): - """get counter values from all ports port""" + def irq_core_stats(self, cores_tasks): + """ get IRQ stats per core""" + + stat = {} + core = 0 + task = 0 + for core, task in cores_tasks: + self.put_command("stats task.core({}).task({}).max_irq,task.core({}).task({}).irq(0)," + "task.core({}).task({}).irq(1),task.core({}).task({}).irq(2)," + "task.core({}).task({}).irq(3),task.core({}).task({}).irq(4)," + "task.core({}).task({}).irq(5),task.core({}).task({}).irq(6)," + "task.core({}).task({}).irq(7),task.core({}).task({}).irq(8)," + "task.core({}).task({}).irq(9),task.core({}).task({}).irq(10)," + "task.core({}).task({}).irq(11),task.core({}).task({}).irq(12)" + "\n".format(core, task, core, task, core, task, core, task, + core, task, core, task, core, task, core, task, + core, task, core, task, core, task, core, task, + core, task, core, task)) + in_data_str = self.get_data().split(",") + ret = [try_int(s, 0) for s in in_data_str] + key = "core_" + str(core) + try: + stat[key] = {"cpu": core, "max_irq": ret[0], "bucket_0" : ret[1], + "bucket_1" : ret[2], "bucket_2" : ret[3], + "bucket_3" : ret[4], "bucket_4" : ret[5], + "bucket_5" : ret[6], "bucket_6" : ret[7], + "bucket_7" : ret[8], "bucket_8" : ret[9], + "bucket_9" : ret[10], "bucket_10" : ret[11], + "bucket_11" : ret[12], "bucket_12" : ret[13], + "overflow": ret[10] + ret[11] + ret[12] + ret[13]} + except (KeyError, IndexError): + LOG.error("Corrupted PACKET %s", in_data_str) + + return stat - ports_str = "" - for port in ports: - ports_str = ports_str + str(port) + "," - ports_str = ports_str[:-1] + def multi_port_stats(self, ports): + """get counter values from all ports at once""" + ports_str = ",".join(map(str, ports)) ports_all_data = [] tot_result = [0] * len(ports) - retry_counter = 0 port_index = 0 - while (len(ports) is not len(ports_all_data)) and (retry_counter < 10): + while (len(ports) is not len(ports_all_data)): self.put_command("multi port stats {}\n".format(ports_str)) - ports_all_data = self.get_data().split(";") + status, ports_all_data_str = self.get_string() + + if not status: + return False, [] + + ports_all_data = ports_all_data_str.split(";") if len(ports) is len(ports_all_data): for port_data_str in ports_all_data: + tmpdata = [] try: - tot_result[port_index] = [try_int(s, 0) for s in port_data_str.split(",")] + tmpdata = [try_int(s, 0) for s in port_data_str.split(",")] except (IndexError, TypeError): - LOG.error("Port Index error %d %s - retrying ", port_index, port_data_str) - - if (len(tot_result[port_index]) is not 6) or \ - tot_result[port_index][0] is not ports[port_index]: - ports_all_data = [] - tot_result = [0] * len(ports) - port_index = 0 - time.sleep(0.1) + LOG.error("Unpacking data error %s", port_data_str) + return False, [] + + if (len(tmpdata) < 6) or tmpdata[0] not in ports: LOG.error("Corrupted PACKET %s - retrying", port_data_str) - break + return False, [] else: + tot_result[port_index] = tmpdata port_index = port_index + 1 else: LOG.error("Empty / too much data - retry -%s-", ports_all_data) - ports_all_data = [] - tot_result = [0] * len(ports) - port_index = 0 - time.sleep(0.1) + return False, [] - retry_counter = retry_counter + 1 - return tot_result + LOG.debug("Multi port packet ..OK.. %s", tot_result) + return True, tot_result + + @staticmethod + def multi_port_stats_tuple(stats, ports): + """ + Create a statistics tuple from port stats. + + Returns a dict with contains the port stats indexed by port name + + :param stats: (List) - List of List of port stats in pps + :param ports (Iterator) - to List of Ports + + :return: (Dict) of port stats indexed by port_name + """ + + samples = {} + port_names = {} + try: + port_names = {port_num: port_name for port_name, port_num in ports} + except (TypeError, IndexError, KeyError): + LOG.critical("Ports are not initialized or number of port is ZERO ... CRITICAL ERROR") + return {} + + try: + for stat in stats: + port_num = stat[0] + samples[port_names[port_num]] = { + "in_packets": stat[1], + "out_packets": stat[2]} + except (TypeError, IndexError, KeyError): + LOG.error("Ports data and samples data is incompatable ....") + return {} + + return samples + + @staticmethod + def multi_port_stats_diff(prev_stats, new_stats, hz): + """ + Create a statistics tuple from difference between prev port stats + and current port stats. And store results in pps. + + :param prev_stats: (List) - Previous List of port statistics + :param new_stats: (List) - Current List of port statistics + :param hz (float) - speed of system in Hz + + :return: sample (List) - Difference of prev_port_stats and + new_port_stats in pps + """ + + RX_TOTAL_INDEX = 1 + TX_TOTAL_INDEX = 2 + TSC_INDEX = 5 + + stats = [] + + if len(prev_stats) is not len(new_stats): + for port_index, stat in enumerate(new_stats): + stats.append([port_index, float(0), float(0), 0, 0, 0]) + return stats + + try: + for port_index, stat in enumerate(new_stats): + if stat[RX_TOTAL_INDEX] > prev_stats[port_index][RX_TOTAL_INDEX]: + rx_total = stat[RX_TOTAL_INDEX] - \ + prev_stats[port_index][RX_TOTAL_INDEX] + else: + rx_total = stat[RX_TOTAL_INDEX] + + if stat[TX_TOTAL_INDEX] > prev_stats[port_index][TX_TOTAL_INDEX]: + tx_total = stat[TX_TOTAL_INDEX] - prev_stats[port_index][TX_TOTAL_INDEX] + else: + tx_total = stat[TX_TOTAL_INDEX] + + if stat[TSC_INDEX] > prev_stats[port_index][TSC_INDEX]: + tsc = stat[TSC_INDEX] - prev_stats[port_index][TSC_INDEX] + else: + tsc = stat[TSC_INDEX] + + if tsc is 0: + rx_total = tx_total = float(0) + else: + if hz is 0: + LOG.error("HZ is ZERO ..") + rx_total = tx_total = float(0) + else: + rx_total = float(rx_total * hz / tsc) + tx_total = float(tx_total * hz / tsc) + + stats.append([port_index, rx_total, tx_total, 0, 0, tsc]) + except (TypeError, IndexError, KeyError): + stats = [] + LOG.info("Current Port Stats incompatable to previous Port stats .. Discarded") + + return stats def port_stats(self, ports): """get counter values from a specific port""" @@ -649,7 +789,6 @@ class ProxSocketHelper(object): self.put_command("quit_force\n") time.sleep(3) - _LOCAL_OBJECT = object() @@ -731,6 +870,30 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): file_str[1] = self.additional_files[base_name] return '"'.join(file_str) + def _make_core_list(self, inputStr): + + my_input = inputStr.split("core ", 1)[1] + ok_list = set() + + substrs = [x.strip() for x in my_input.split(',')] + for i in substrs: + try: + ok_list.add(int(i)) + + except ValueError: + try: + substr = [int(k.strip()) for k in i.split('-')] + if len(substr) > 1: + startstr = substr[0] + endstr = substr[len(substr) - 1] + for z in range(startstr, endstr + 1): + ok_list.add(z) + except ValueError: + LOG.error("Error in cores list ... resuming ") + return ok_list + + return ok_list + def generate_prox_config_file(self, config_path): sections = [] prox_config = ConfigParser(config_path, sections) @@ -750,6 +913,18 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): if section_data[0] == "mac": section_data[1] = "hardware" + # adjust for range of cores + new_sections = [] + for section_name, section in sections: + if section_name.startswith('core') and section_name.find('$') == -1: + core_list = self._make_core_list(section_name) + for core in core_list: + new_sections.append(["core " + str(core), section]) + else: + new_sections.append([section_name, section]) + + sections = new_sections + # search for dst mac for _, section in sections: for section_data in section: @@ -956,6 +1131,8 @@ class ProxResourceHelper(ClientResourceHelper): self.step_delta = 1 self.step_time = 0.5 self._test_type = None + self.prev_multi_port = [] + self.prev_hz = 0 @property def sut(self): @@ -969,7 +1146,7 @@ class ProxResourceHelper(ClientResourceHelper): self._test_type = self.setup_helper.find_in_section('global', 'name', None) return self._test_type - def run_traffic(self, traffic_profile, *args): + def run_traffic(self, traffic_profile): self._queue.cancel_join_thread() self.lower = 0.0 self.upper = 100.0 @@ -984,7 +1161,7 @@ class ProxResourceHelper(ClientResourceHelper): def _run_traffic_once(self, traffic_profile): traffic_profile.execute_traffic(self) - if traffic_profile.done: + if traffic_profile.done.is_set(): self._queue.put({'done': True}) LOG.debug("tg_prox done") self._terminated.value = 1 @@ -994,11 +1171,40 @@ class ProxResourceHelper(ClientResourceHelper): def collect_collectd_kpi(self): return self._collect_resource_kpi() + def collect_live_stats(self): + ports = [] + for _, port_num in self.vnfd_helper.ports_iter(): + ports.append(port_num) + + ok, curr_port_stats = self.sut.multi_port_stats(ports) + if not ok: + return False, {} + + hz = self.sut.hz() + if hz is 0: + hz = self.prev_hz + else: + self.prev_hz = hz + + new_all_port_stats = \ + self.sut.multi_port_stats_diff(self.prev_multi_port, curr_port_stats, hz) + + self.prev_multi_port = curr_port_stats + + live_stats = self.sut.multi_port_stats_tuple(new_all_port_stats, + self.vnfd_helper.ports_iter()) + return True, live_stats + def collect_kpi(self): result = super(ProxResourceHelper, self).collect_kpi() # add in collectd kpis manually if result: result['collect_stats'] = self._collect_resource_kpi() + + ok, live_stats = self.collect_live_stats() + if ok: + result.update({'live_stats': live_stats}) + return result def terminate(self): @@ -1070,41 +1276,70 @@ class ProxDataHelper(object): def totals_and_pps(self): if self._totals_and_pps is None: rx_total = tx_total = 0 - all_ports = self.sut.multi_port_stats(range(self.port_count)) - for port in all_ports: - rx_total = rx_total + port[1] - tx_total = tx_total + port[2] - requested_pps = self.value / 100.0 * self.line_rate_to_pps() - self._totals_and_pps = rx_total, tx_total, requested_pps + ok = False + timeout = time.time() + constants.RETRY_TIMEOUT + while not ok: + ok, all_ports = self.sut.multi_port_stats([ + self.vnfd_helper.port_num(port_name) + for port_name in self.vnfd_helper.port_pairs.all_ports]) + if time.time() > timeout: + break + if ok: + for port in all_ports: + rx_total = rx_total + port[1] + tx_total = tx_total + port[2] + requested_pps = self.value / 100.0 * self.line_rate_to_pps() + self._totals_and_pps = rx_total, tx_total, requested_pps return self._totals_and_pps @property def rx_total(self): - return self.totals_and_pps[0] + try: + ret_val = self.totals_and_pps[0] + except (AttributeError, ValueError, TypeError, LookupError): + ret_val = 0 + return ret_val @property def tx_total(self): - return self.totals_and_pps[1] + try: + ret_val = self.totals_and_pps[1] + except (AttributeError, ValueError, TypeError, LookupError): + ret_val = 0 + return ret_val @property def requested_pps(self): - return self.totals_and_pps[2] + try: + ret_val = self.totals_and_pps[2] + except (AttributeError, ValueError, TypeError, LookupError): + ret_val = 0 + return ret_val @property def samples(self): samples = {} ports = [] - port_names = [] + port_names = {} for port_name, port_num in self.vnfd_helper.ports_iter(): ports.append(port_num) - port_names.append(port_name) - - results = self.sut.multi_port_stats(ports) - for result in results: - port_num = result[0] - samples[port_names[port_num]] = { - "in_packets": result[1], - "out_packets": result[2]} + port_names[port_num] = port_name + + ok = False + timeout = time.time() + constants.RETRY_TIMEOUT + while not ok: + ok, results = self.sut.multi_port_stats(ports) + if time.time() > timeout: + break + if ok: + for result in results: + port_num = result[0] + try: + samples[port_names[port_num]] = { + "in_packets": result[1], + "out_packets": result[2]} + except (IndexError, KeyError): + pass return samples def __enter__(self): @@ -1902,3 +2137,15 @@ class ProxlwAFTRProfileHelper(ProxProfileHelper): data_helper.latency = self.get_latency() return data_helper.result_tuple, data_helper.samples + + +class ProxIrqProfileHelper(ProxProfileHelper): + + __prox_profile_type__ = "IRQ Query" + + def __init__(self, resource_helper): + super(ProxIrqProfileHelper, self).__init__(resource_helper) + self._cores_tuple = None + self._ports_tuple = None + self.step_delta = 5 + self.step_time = 0.5 |