diff options
author | Ross Brattain <ross.b.brattain@intel.com> | 2017-04-24 23:25:48 +0200 |
---|---|---|
committer | Ross Brattain <ross.b.brattain@intel.com> | 2017-08-14 16:22:51 -0700 |
commit | a4d2958ddb727c2aaf46956cbdbf7718bfd0be89 (patch) | |
tree | dcd1293eb6306fd1a955018c26d59e7e0293c864 /yardstick/network_services/vnf_generic/vnf | |
parent | fb57af27e97bbd27e5eeeeddc766518e3c700f3c (diff) |
PROX VNF and TG
PROX was added to samplevnf project
https://git.opnfv.org/samplevnf/tree/VNFs/DPPD-PROX
JIRA: YARDSTICK-638
Change-Id: If9875b1130c6bed87deb8720b0d8b28ede9289d9
Signed-off-by: Ross Brattain <ross.b.brattain@intel.com>
Diffstat (limited to 'yardstick/network_services/vnf_generic/vnf')
5 files changed, 1327 insertions, 0 deletions
diff --git a/yardstick/network_services/vnf_generic/vnf/iniparser.py b/yardstick/network_services/vnf_generic/vnf/iniparser.py new file mode 100644 index 000000000..996441264 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/iniparser.py @@ -0,0 +1,177 @@ +# Copyright 2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class ParseError(Exception): + def __init__(self, message, line_no, line): + self.msg = message + self.line = line + self.line_no = line_no + + def __str__(self): + return 'at line %d, %s: %r' % (self.line_no, self.msg, self.line) + + +class BaseParser(object): + + PARSE_EXC = ParseError + + def __init__(self): + super(BaseParser, self).__init__() + self.line_no = 0 + + def _assignment(self, key, value): + self.assignment(key, value) + return None, [] + + def _get_section(self, line): + if not line.endswith(']'): + return self.error_no_section_end_bracket(line) + if len(line) <= 2: + return self.error_no_section_name(line) + + return line[1:-1] + + def _split_key_value(self, line): + colon = line.find(':') + equal = line.find('=') + if colon < 0 and equal < 0: + return self.error_invalid_assignment(line) + + if colon < 0 or (0 <= equal < colon): + key, value = line[:equal], line[equal + 1:] + else: + key, value = line[:colon], line[colon + 1:] + + value = value.strip() + if value and value[0] == value[-1] and value.startswith(("\"", "'")): + value = value[1:-1] + return key.strip(), [value] + + def _single_line_parse(self, line, key, value): + self.line_no += 1 + + if line.startswith(('#', ';')): + self.comment(line[1:].strip()) + return key, value + + active, _, comment = line.partition(';') + self.comment(comment.strip()) + + if not active: + # Blank line, ends multi-line values + if key: + key, value = self._assignment(key, value) + return key, value + + if active.startswith((' ', '\t')): + # Continuation of previous assignment + if key is None: + return self.error_unexpected_continuation(line) + + value.append(active.lstrip()) + return key, value + + if key: + # Flush previous assignment, if any + key, value = self._assignment(key, value) + + if active.startswith('['): + # Section start + section = self._get_section(active) + if section: + self.new_section(section) + + else: + key, value = self._split_key_value(active) + if not key: + return self.error_empty_key(line) + + return key, value + + def parse(self, line_iter=None): + if line_iter is None: + return + + key = None + value = [] + + for line in line_iter: + key, value = self._single_line_parse(line, key, value) + + if key: + # Flush previous assignment, if any + self._assignment(key, value) + + def assignment(self, key, value): + """Called when a full assignment is parsed.""" + raise NotImplementedError() + + def new_section(self, section): + """Called when a new section is started.""" + raise NotImplementedError() + + def comment(self, comment): + """Called when a comment is parsed.""" + pass + + def make_parser_error(self, template, line): + raise self.PARSE_EXC(template, self.line_no, line) + + def error_invalid_assignment(self, line): + self.make_parser_error("No ':' or '=' found in assignment", line) + + def error_empty_key(self, line): + self.make_parser_error('Key cannot be empty', line) + + def error_unexpected_continuation(self, line): + self.make_parser_error('Unexpected continuation line', line) + + def error_no_section_end_bracket(self, line): + self.make_parser_error('Invalid section (must end with ])', line) + + def error_no_section_name(self, line): + self.make_parser_error('Empty section name', line) + + +class ConfigParser(BaseParser): + """Parses a single config file, populating 'sections' to look like: + + {'DEFAULT': {'key': [value, ...], ...}, + ...} + """ + + def __init__(self, filename, sections): + super(ConfigParser, self).__init__() + self.filename = filename + self.sections = sections + self.section = None + + def parse(self, line_iter=None): + with open(self.filename) as f: + return super(ConfigParser, self).parse(f) + + def new_section(self, section): + self.section = section + self.sections.setdefault(self.section, []) + + def assignment(self, key, value): + if not self.section: + raise self.error_no_section() + + value = '\n'.join(value) + self.sections[self.section].append([key, value]) + + def error_no_section(self): + self.make_parser_error('Section must be started before assignment', '') diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py new file mode 100644 index 000000000..dfed45aa4 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -0,0 +1,963 @@ +# Copyright (c) 2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import absolute_import + +import array +import operator +import logging +import os +import re +import select +import socket +from collections import OrderedDict, namedtuple +import time +from contextlib import contextmanager +from itertools import repeat, chain + +from six.moves import zip, StringIO + +from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file +from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings +from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser +from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper +from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper + +PROX_PORT = 8474 + +LOG = logging.getLogger(__name__) + +TEN_GIGABIT = 1e10 +BITS_PER_BYTE = 8 +RETRY_SECONDS = 60 +RETRY_INTERVAL = 1 + +CONFIGURATION_OPTIONS = ( + # dict key section key default value + ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'), + ('testDuration', 'general', 'test_duration', 5.0), + ('testPrecision', 'general', 'test_precision', 1.0), + ('tests', 'general', 'tests', None), + ('toleratedLoss', 'general', 'tolerated_loss', 0.0), + + ('logFile', 'logging', 'file', 'dats.log'), + ('logDateFormat', 'logging', 'datefmt', None), + ('logLevel', 'logging', 'level', 'INFO'), + ('logOverwrite', 'logging', 'overwrite', 1), + + ('testerIp', 'tester', 'ip', None), + ('testerUser', 'tester', 'user', 'root'), + ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'), + ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'), + ('testerProxDir', 'tester', 'prox_dir', '/root/prox'), + ('testerSocketId', 'tester', 'socket_id', 0), + + ('sutIp', 'sut', 'ip', None), + ('sutUser', 'sut', 'user', 'root'), + ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'), + ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'), + ('sutProxDir', 'sut', 'prox_dir', '/root/prox'), + ('sutSocketId', 'sut', 'socket_id', 0), +) + + +class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')): + + CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?") + + def __new__(cls, *args): + try: + matches = cls.CORE_RE.search(str(args[0])) + if matches: + args = matches.groups() + + return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), int(args[1]), + 'h' if args[2] else '') + + except (AttributeError, TypeError, IndexError, ValueError): + raise ValueError('Invalid core spec {}'.format(args)) + + def is_hyperthread(self): + return self.hyperthread == 'h' + + @property + def index(self): + return int(self.is_hyperthread()) + + def find_in_topology(self, cpu_topology): + try: + socket_core_match = cpu_topology[self.socket_id][self.core_id] + sorted_match = sorted(socket_core_match.values()) + return sorted_match[self.index][0] + except (KeyError, IndexError): + template = "Core {}{} on socket {} does not exist" + raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id)) + + +class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')): + + def __new__(cls, *args): + try: + assert args[0] is not str(args[0]) + args = tuple(args[0]) + except (AssertionError, IndexError, TypeError): + pass + + return super(TotStatsTuple, cls).__new__(cls, *args) + + +class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,' + 'delta_tx,delta_tsc,' + 'latency,rx_total,tx_total,pps')): + + @property + def pkt_loss(self): + try: + return 1e2 * self.drop_total / float(self.tx_total) + except ZeroDivisionError: + return 100.0 + + @property + def mpps(self): + # calculate the effective throughput in Mpps + return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6 + + @property + def can_be_lost(self): + return int(self.tx_total * self.tolerated / 1e2) + + @property + def drop_total(self): + return self.tx_total - self.rx_total + + @property + def success(self): + return self.drop_total <= self.can_be_lost + + def get_samples(self, pkt_size, pkt_loss=None): + if pkt_loss is None: + pkt_loss = self.pkt_loss + + latency_keys = [ + "LatencyMin", + "LatencyMax", + "LatencyAvg", + ] + + samples = { + "Throughput": self.mpps, + "DropPackets": pkt_loss, + "CurrentDropPackets": pkt_loss, + "TxThroughput": self.pps / 1e6, + "RxThroughput": self.mpps, + "PktSize": pkt_size, + } + + samples.update((key, value) for key, value in zip(latency_keys, self.latency)) + return samples + + def log_data(self, logger=None): + if logger is None: + logger = LOG + + template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)" + logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost) + logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps) + + +class PacketDump(object): + + @staticmethod + def assert_func(func, value1, value2, template=None): + assert func(value1, value2), template.format(value1, value2) + + def __init__(self, port_id, data_len, payload): + template = "Packet dump has specified length {}, but payload is {} bytes long" + self.assert_func(operator.eq, data_len, len(payload), template) + self._port_id = port_id + self._data_len = data_len + self._payload = payload + + @property + def port_id(self): + """Get the port id of the packet dump""" + return self._port_id + + @property + def data_len(self): + """Get the length of the data received""" + return self._data_len + + def __str__(self): + return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload) + + def payload(self, start=None, end=None): + """Get part of the payload as a list of ordinals. + + Returns a list of byte values, matching the contents of the packet dump. + Optional start and end parameters can be specified to retrieve only a + part of the packet contents. + + The number of elements in the list is equal to end - start + 1, so end + is the offset of the last character. + + Args: + start (pos. int): the starting offset in the payload. If it is not + specified or None, offset 0 is assumed. + end (pos. int): the ending offset of the payload. If it is not + specified or None, the contents until the end of the packet are + returned. + + Returns: + [int, int, ...]. Each int represents the ordinal value of a byte in + the packet payload. + """ + if start is None: + start = 0 + + if end is None: + end = self.data_len - 1 + + # Bounds checking on offsets + template = "Start offset must be non-negative" + self.assert_func(operator.ge, start, 0, template) + + template = "End offset must be less than {1}" + self.assert_func(operator.lt, end, self.data_len, template) + + # Adjust for splice operation: end offset must be 1 more than the offset + # of the last desired character. + end += 1 + + return self._payload[start:end] + + +class ProxSocketHelper(object): + + def __init__(self, sock=None): + """ creates new prox instance """ + super(ProxSocketHelper, self).__init__() + + if sock is None: + sock = socket.socket() + + self._sock = sock + self._pkt_dumps = [] + + def connect(self, ip, port): + """Connect to the prox instance on the remote system""" + self._sock.connect((ip, port)) + + def get_socket(self): + """ get the socket connected to the remote instance """ + return self._sock + + def _parse_socket_data(self, decoded_data, pkt_dump_only): + def get_newline_index(): + return decoded_data.find('\n', index) + + ret_str = '' + index = 0 + for newline_index in iter(get_newline_index, -1): + ret_str = decoded_data[index:newline_index] + + try: + mode, port_id, data_len = ret_str.split(',', 2) + except ValueError: + mode, port_id, data_len = None, None, None + + if mode != 'pktdump': + # Regular 1-line message. Stop reading from the socket. + LOG.debug("Regular response read") + return ret_str + + LOG.debug("Packet dump header read: [%s]", ret_str) + + # The line is a packet dump header. Parse it, read the + # packet payload, store the dump for later retrieval. + # Skip over the packet dump and continue processing: a + # 1-line response may follow the packet dump. + + data_len = int(data_len) + data_start = newline_index + 1 # + 1 to skip over \n + data_end = data_start + data_len + sub_data = decoded_data[data_start:data_end] + pkt_payload = array.array('B', (ord(v) for v in sub_data)) + pkt_dump = PacketDump(int(port_id), data_len, pkt_payload) + self._pkt_dumps.append(pkt_dump) + + if pkt_dump_only: + # Return boolean instead of string to signal + # successful reception of the packet dump. + LOG.debug("Packet dump stored, returning") + return True + + index = data_end + 1 + + return ret_str + + def get_data(self, pkt_dump_only=False, timeout=1): + """ read data from the socket """ + # This method behaves slightly differently depending on whether it is + # called to read the response to a command (pkt_dump_only = 0) or if + # it is called specifically to read a packet dump (pkt_dump_only = 1). + # + # Packet dumps look like: + # pktdump,<port_id>,<data_len>\n + # <packet contents as byte array>\n + # This means the total packet dump message consists of 2 lines instead + # of 1 line. + # + # - Response for a command (pkt_dump_only = 0): + # 1) Read response from the socket until \n (end of message) + # 2a) If the response is a packet dump header (starts with "pktdump,"): + # - Read the packet payload and store the packet dump for later + # retrieval. + # - Reset the state and restart from 1). Eventually state 2b) will + # be reached and the function will return. + # 2b) If the response is not a packet dump: + # - Return the received message as a string + # + # - Explicit request to read a packet dump (pkt_dump_only = 1): + # - Read the dump header and payload + # - Store the packet dump for later retrieval + # - Return True to signify a packet dump was successfully read + + def is_ready(): + # recv() is blocking, so avoid calling it when no data is waiting. + ready = select.select([self._sock], [], [], timeout) + return bool(ready[0]) + + status = False + ret_str = "" + for status in iter(is_ready, False): + LOG.debug("Reading from socket") + decoded_data = self._sock.recv(256).decode('utf-8') + ret_str = self._parse_socket_data(decoded_data, pkt_dump_only) + + LOG.debug("Received data from socket: [%s]", ret_str) + return ret_str if status else '' + + def put_command(self, to_send): + """ send data to the remote instance """ + LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n')) + self._sock.sendall(to_send.encode('utf-8')) + + def get_packet_dump(self): + """ get the next packet dump """ + if self._pkt_dumps: + return self._pkt_dumps.pop(0) + return None + + def stop_all_reset(self): + """ stop the remote instance and reset stats """ + LOG.debug("Stop all and reset stats") + self.stop_all() + self.reset_stats() + + def stop_all(self): + """ stop all cores on the remote instance """ + LOG.debug("Stop all") + self.put_command("stop all\n") + time.sleep(3) + + def stop(self, cores, task=''): + """ stop specific cores on the remote instance """ + LOG.debug("Stopping cores %s", cores) + self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task)) + time.sleep(3) + + def start_all(self): + """ start all cores on the remote instance """ + LOG.debug("Start all") + self.put_command("start all\n") + + def start(self, cores): + """ start specific cores on the remote instance """ + LOG.debug("Starting cores %s", cores) + self.put_command("start {}\n".format(join_non_strings(',', cores))) + time.sleep(3) + + def reset_stats(self): + """ reset the statistics on the remote instance """ + LOG.debug("Reset stats") + self.put_command("reset stats\n") + time.sleep(1) + + def _run_template_over_cores(self, template, cores, *args): + for core in cores: + self.put_command(template.format(core, *args)) + + def set_pkt_size(self, cores, pkt_size): + """ set the packet size to generate on the remote instance """ + LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size) + pkt_size -= 4 + self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size) + time.sleep(1) + + def set_value(self, cores, offset, value, length): + """ set value on the remote instance """ + msg = "Set value for core(s) %s to '%s' (length %d), offset %d" + LOG.debug(msg, cores, value, length, offset) + template = "set value {} 0 {} {} {}\n" + self._run_template_over_cores(template, cores, offset, value, length) + + def reset_values(self, cores): + """ reset values on the remote instance """ + LOG.debug("Set value for core(s) %s", cores) + self._run_template_over_cores("reset values {} 0\n", cores) + + def set_speed(self, cores, speed): + """ set speed on the remote instance """ + LOG.debug("Set speed for core(s) %s to %g", cores, speed) + self._run_template_over_cores("speed {} 0 {}\n", cores, speed) + + def slope_speed(self, cores_speed, duration, n_steps=0): + """will start to increase speed from 0 to N where N is taken from + a['speed'] for each a in cores_speed""" + # by default, each step will take 0.5 sec + if n_steps == 0: + n_steps = duration * 2 + + private_core_data = [] + step_duration = float(duration) / n_steps + for core_data in cores_speed: + target = float(core_data['speed']) + private_core_data.append({ + 'cores': core_data['cores'], + 'zero': 0, + 'delta': target / n_steps, + 'current': 0, + 'speed': target, + }) + + deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1) + for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]): + time.sleep(step_duration) + for core_data in private_core_data: + core_data['current'] = core_data[key1] + core_data[key2] + self.set_speed(core_data['cores'], core_data['current']) + + def set_pps(self, cores, pps, pkt_size): + """ set packets per second for specific cores on the remote instance """ + msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)" + LOG.debug(msg, cores, pps, pkt_size) + + # speed in percent of line-rate + speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE + self._run_template_over_cores("speed {} 0 {}\n", cores, speed) + + def lat_stats(self, cores, task=0): + """Get the latency statistics from the remote system""" + # 1-based index, if max core is 4, then 0, 1, 2, 3, 4 len = 5 + lat_min = {} + lat_max = {} + lat_avg = {} + for core in cores: + self.put_command("lat stats {} {} \n".format(core, task)) + ret = self.get_data() + + try: + lat_min[core], lat_max[core], lat_avg[core] = \ + tuple(int(n) for n in ret.split(",")[:3]) + + except (AttributeError, ValueError, TypeError): + pass + + return lat_min, lat_max, lat_avg + + def get_all_tot_stats(self): + self.put_command("tot stats\n") + all_stats = TotStatsTuple(int(v) for v in self.get_data().split(",")) + return all_stats + + def hz(self): + return self.get_all_tot_stats().hz + + # Deprecated + # TODO: remove + def rx_stats(self, cores, task=0): + return self.core_stats(cores, task) + + def core_stats(self, cores, task=0): + """Get the receive statistics from the remote system""" + rx = tx = drop = tsc = 0 + for core in cores: + self.put_command("core stats {} {}\n".format(core, task)) + ret = self.get_data().split(",") + rx += int(ret[0]) + tx += int(ret[1]) + drop += int(ret[2]) + tsc = int(ret[3]) + return rx, tx, drop, tsc + + def port_stats(self, ports): + """get counter values from a specific port""" + tot_result = list(repeat(0, 12)) + for port in ports: + self.put_command("port_stats {}\n".format(port)) + for index, n in enumerate(self.get_data().split(',')): + tot_result[index] += int(n) + return tot_result + + @contextmanager + def measure_tot_stats(self): + start = self.get_all_tot_stats() + container = {'start_tot': start} + try: + yield container + finally: + container['end_tot'] = end = self.get_all_tot_stats() + + container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end)) + + def tot_stats(self): + """Get the total statistics from the remote system""" + stats = self.get_all_tot_stats() + return stats[:3] + + def tot_ierrors(self): + """Get the total ierrors from the remote system""" + self.put_command("tot ierrors tot\n") + recv = self.get_data().split(',') + tot_ierrors = int(recv[0]) + tsc = int(recv[0]) + return tot_ierrors, tsc + + def set_count(self, count, cores): + """Set the number of packets to send on the specified core""" + self._run_template_over_cores("count {} 0 {}\n", cores, count) + + def dump_rx(self, core_id, task_id=0, count=1): + """Activate dump on rx on the specified core""" + LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count) + self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count)) + time.sleep(1.5) # Give PROX time to set up packet dumping + + def quit(self): + self.stop_all() + self._quit() + self.force_quit() + + def _quit(self): + """ stop all cores on the remote instance """ + LOG.debug("Quit prox") + self.put_command("quit\n") + time.sleep(3) + + def force_quit(self): + """ stop all cores on the remote instance """ + LOG.debug("Force Quit prox") + self.put_command("quit_force\n") + time.sleep(3) + + +class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): + + def __init__(self, vnfd_helper, ssh_helper, scenario_helper): + super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper) + self.dpdk_root = "/root/dpdk-17.02" + + def setup_vnf_environment(self): + super(ProxDpdkVnfSetupEnvHelper, self).setup_vnf_environment() + + # debug dump after binding + self.ssh_helper.execute("sudo {} -s".format(self.dpdk_nic_bind)) + + def rebind_drivers(self, force=True): + if force: + force = '--force ' + else: + force = '' + cmd_template = "{} {}-b {} {}" + if not self.used_drivers: + self._find_used_drivers() + for vpci, (_, driver) in self.used_drivers.items(): + self.ssh_helper.execute(cmd_template.format(self.dpdk_nic_bind, force, driver, vpci)) + + def _setup_dpdk(self): + self._setup_hugepages() + + self.ssh_helper.execute("pkill prox") + self.ssh_helper.execute("sudo modprobe uio") + + # for baremetal + self.ssh_helper.execute("sudo modprobe msr") + + # why remove?, just keep it loaded + # self.connection.execute("sudo rmmod igb_uio") + + igb_uio_path = os.path.join(self.dpdk_root, "x86_64-native-linuxapp-gcc/kmod/igb_uio.ko") + self.ssh_helper.execute("sudo insmod {}".format(igb_uio_path)) + + # quick hack to allow non-root copy + self.ssh_helper.execute("sudo chmod 0777 {}".format(self.ssh_helper.bin_path)) + + +class ProxResourceHelper(ClientResourceHelper): + + PROX_CORE_GEN_MODE = "gen" + PROX_CORE_LAT_MODE = "lat" + + PROX_MODE = "" + + LUA_PARAMETER_NAME = "" + LUA_PARAMETER_PEER = { + "gen": "sut", + "sut": "gen", + } + + WAIT_TIME = 3 + + @staticmethod + def _replace_quoted_with_value(quoted, value, count=1): + new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count) + return new_string + + @staticmethod + def _get_tx_port(section, sections): + iface_port = [-1] + for item in sections[section]: + if item[0] == "tx port": + iface_port = re.findall(r'\d+', item[1]) + # do we want the last one? + # if yes, then can we reverse? + return int(iface_port[0]) + + @staticmethod + def line_rate_to_pps(pkt_size, n_ports): + # FIXME Don't hardcode 10Gb/s + return n_ports * TEN_GIGABIT / BITS_PER_BYTE / (pkt_size + 20) + + @staticmethod + def find_pci(pci, bound_pci): + # we have to substring match PCI bus address from the end + return any(b.endswith(pci) for b in bound_pci) + + @staticmethod + def write_prox_config(prox_config): + """ + Write an .ini-format config file for PROX + PROX does not allow a space before/after the =, so we need + a custom method + """ + out = [] + for section_name, section_value in prox_config.items(): + out.append("[{}]".format(section_name)) + for key, value in section_value: + if key == "__name__": + continue + if value is not None: + key = "=".join((key, str(value).replace('\n', '\n\t'))) + out.append(key) + return os.linesep.join(out) + + def __init__(self, setup_helper): + super(ProxResourceHelper, self).__init__(setup_helper) + self.mgmt_interface = self.vnfd_helper.mgmt_interface + self._user = self.mgmt_interface["user"] + self._ip = self.mgmt_interface["ip"] + + self.done = False + self._cpu_topology = None + self._vpci_to_if_name_map = None + self.additional_file = False + self.remote_prox_file_name = None + self.prox_config_dict = None + self.lower = None + self.upper = None + self._test_cores = None + self._latency_cores = None + + @property + def sut(self): + if not self.client: + self.client = ProxSocketHelper() + return self.client + + @property + def cpu_topology(self): + if not self._cpu_topology: + stdout = self.ssh_helper.execute("cat /proc/cpuinfo")[1] + self._cpu_topology = SocketTopology.parse_cpuinfo(stdout) + return self._cpu_topology + + @property + def vpci_to_if_name_map(self): + if self._vpci_to_if_name_map is None: + self._vpci_to_if_name_map = { + interface["virtual-interface"]["vpci"]: interface["name"] + for interface in self.vnfd_helper.interfaces + } + return self._vpci_to_if_name_map + + @property + def test_cores(self): + if not self._test_cores: + self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE) + return self._test_cores + + @property + def latency_cores(self): + if not self._latency_cores: + self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE) + return self._latency_cores + + def run_traffic(self, traffic_profile): + self.lower = 0.0 + self.upper = 100.0 + + traffic_profile.init(self._queue) + # this frees up the run_traffic loop + self.client_started.value = 1 + + while not self._terminated.value: + # move it all to traffic_profile + self._run_traffic_once(traffic_profile) + + def _run_traffic_once(self, traffic_profile): + traffic_profile.execute(self) + if traffic_profile.done: + self._queue.put({'done': True}) + LOG.debug("tg_prox done") + self._terminated.value = 1 + + def start_collect(self): + pass + + def terminate(self): + super(ProxResourceHelper, self).terminate() + self.ssh_helper.execute('sudo pkill prox') + self.setup_helper.rebind_drivers() + + def get_process_args(self): + task_path = self.scenario_helper.task_path + options = self.scenario_helper.options + + prox_args = options['prox_args'] + prox_path = options['prox_path'] + config_path = options['prox_config'] + + config_file = os.path.basename(config_path) + config_path = find_relative_file(config_path, task_path) + + try: + prox_file_config_path = options['prox_files'] + prox_file_file = os.path.basename(prox_file_config_path) + prox_file_config_path = find_relative_file(prox_file_config_path, task_path) + self.remote_prox_file_name = self.copy_to_target(prox_file_config_path, prox_file_file) + self.additional_file = True + except: + self.additional_file = False + + self.prox_config_dict = self.generate_prox_config_file(config_path) + + remote_path = self.upload_prox_config(config_file, self.prox_config_dict) + return prox_args, prox_path, remote_path + + def up_post(self): + return self.sut # force connection + + def execute(self, cmd, *args, **kwargs): + func = getattr(self.sut, cmd, None) + if func: + return func(*args, **kwargs) + + def copy_to_target(self, config_file_path, prox_file): + remote_path = os.path.join("/tmp", prox_file) + self.ssh_helper.put(config_file_path, remote_path) + return remote_path + + def upload_prox_config(self, config_file, prox_config_dict): + # prox can't handle spaces around ' = ' so use custom method + out = StringIO(self.write_prox_config(prox_config_dict)) + out.seek(0) + remote_path = os.path.join("/tmp", config_file) + self.ssh_helper.put_file_obj(out, remote_path) + + return remote_path + + @contextmanager + def traffic_context(self, pkt_size, value): + self.sut.stop_all() + self.sut.reset_stats() + self.sut.set_pkt_size(self.test_cores, pkt_size) + self.sut.set_speed(self.test_cores, value) + self.sut.start_all() + try: + yield + finally: + self.sut.stop_all() + + def run_test(self, pkt_size, duration, value, tolerated_loss=0.0): + # do this assert in init? unless we expect interface count to + # change from one run to another run... + interfaces = self.vnfd_helper.interfaces + interface_count = len(interfaces) + assert interface_count in {2, 4}, \ + "Invalid no of ports, 2 or 4 ports only supported at this time" + + with self.traffic_context(pkt_size, value): + # Getting statistics to calculate PPS at right speed.... + tsc_hz = float(self.sut.hz()) + time.sleep(2) + with self.sut.measure_tot_stats() as data: + time.sleep(duration) + + # Get stats before stopping the cores. Stopping cores takes some time + # and might skew results otherwise. + latency = self.get_latency() + + deltas = data['delta'] + rx_total, tx_total = self.sut.port_stats(range(interface_count))[6:8] + pps = value / 100.0 * self.line_rate_to_pps(pkt_size, interface_count) + + result = ProxTestDataTuple(tolerated_loss, tsc_hz, deltas.rx, deltas.tx, + deltas.tsc, latency, rx_total, tx_total, pps) + + result.log_data() + return result + + def get_cores(self, mode): + cores = [] + for section_name, section_data in self.prox_config_dict.items(): + if section_name.startswith("core"): + for index, item in enumerate(section_data): + if item[0] == "mode" and item[1] == mode: + core = CoreSocketTuple(section_name).find_in_topology(self.cpu_topology) + cores.append(core) + return cores + + def upload_prox_lua(self, config_dir, prox_config_dict): + # we could have multiple lua directives + lau_dict = prox_config_dict.get('lua', {}) + find_iter = (re.findall('\("([^"]+)"\)', k) for k in lau_dict) + lua_file = next((found[0] for found in find_iter if found), None) + if not lua_file: + return "" + + out = self.generate_prox_lua_file() + remote_path = os.path.join(config_dir, lua_file) + return self.put_string_to_file(out, remote_path) + + def put_string_to_file(self, s, remote_path): + self.ssh_helper.run("cat > '{}'".format(remote_path), stdin=s) + return remote_path + + def generate_prox_lua_file(self): + p = OrderedDict() + ext_intf = self.vnfd_helper.interfaces + lua_param = self.LUA_PARAMETER_NAME + for intf in ext_intf: + peer = self.LUA_PARAMETER_PEER[lua_param] + port_num = intf["virtual-interface"]["dpdk_port_num"] + local_ip = intf["local_ip"] + dst_ip = intf["dst_ip"] + local_ip_hex = ip_to_hex(local_ip, separator=' ') + dst_ip_hex = ip_to_hex(dst_ip, separator=' ') + p.update([ + ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex), + ("{}_ip_port_{}".format(lua_param, port_num), local_ip), + ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex), + ("{}_ip_port_{}".format(peer, port_num), dst_ip), + ]) + lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items())) + return lua + + def generate_prox_config_file(self, config_path): + sections = {} + prox_config = ConfigParser(config_path, sections) + prox_config.parse() + + # Ensure MAC is set "hardware" + ext_intf = self.vnfd_helper.interfaces + for intf in ext_intf: + port_num = intf["virtual-interface"]["dpdk_port_num"] + section_name = "port {}".format(port_num) + for index, section_data in enumerate(sections.get(section_name, [])): + if section_data[0] == "mac": + sections[section_name][index][1] = "hardware" + + # search for dest mac + for section_name, section_data in sections.items(): + for index, section_attr in enumerate(section_data): + if section_attr[0] != "dst mac": + continue + + tx_port_no = self._get_tx_port(section_name, sections) + if tx_port_no == -1: + raise Exception("Failed ..destination MAC undefined") + + dst_mac = ext_intf[tx_port_no]["virtual-interface"]["dst_mac"] + section_attr[1] = dst_mac + + # if addition file specified in prox config + if self.additional_file: + remote_name = self.remote_prox_file_name + for section_data in sections.values(): + for index, section_attr in enumerate(section_data): + try: + if section_attr[1].startswith("dofile"): + new_string = self._replace_quoted_with_value(section_attr[1], + remote_name) + section_attr[1] = new_string + except: + pass + + return sections + + def get_latency(self): + """ + :return: return lat_min, lat_max, lat_avg + :rtype: list + """ + if self._latency_cores: + return self.sut.lat_stats(self._latency_cores) + return [] + + def _get_logical_if_name(self, vpci): + return self._vpci_to_if_name_map[vpci] + + def _connect(self, client=None): + """Run and connect to prox on the remote system """ + # De-allocating a large amount of hugepages takes some time. If a new + # PROX instance is started immediately after killing the previous one, + # it might not be able to allocate hugepages, because they are still + # being freed. Hence the -w switch. + # self.connection.execute("sudo killall -w Prox 2>/dev/null") + # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t + # -f ./handle_none-4.cfg" + # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir + + # "; " \ + # + "export RTE_TARGET=" + self._dpdk_target + ";" \ + # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50; + # sudo " \ + # + "./build/Prox " + prox_args + # log.debug("Starting PROX with command [%s]", prox_cmd) + # thread.start_new_thread(self.ssh_check_quit, (self, self._user, + # self._ip, prox_cmd)) + if client is None: + client = ProxSocketHelper() + + # try connecting to Prox for 60s + for _ in range(RETRY_SECONDS): + time.sleep(RETRY_INTERVAL) + try: + client.connect(self._ip, PROX_PORT) + except (socket.gaierror, socket.error): + continue + else: + return client + + msg = "Failed to connect to prox, please check if system {} accepts connections on port {}" + raise Exception(msg.format(self._ip, PROX_PORT)) diff --git a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py new file mode 100644 index 000000000..88911c3fc --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py @@ -0,0 +1,117 @@ +# Copyright (c) 2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import multiprocessing +import os +import time + +from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper +from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxResourceHelper +from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxDpdkVnfSetupEnvHelper +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF + +LOG = logging.getLogger(__name__) + + +class ProxApproxVnf(SampleVNF): + + APP_NAME = 'PROX' + APP_WORD = 'PROX' + PROX_MODE = "Workload" + VNF_PROMPT = "PROX started" + LUA_PARAMETER_NAME = "sut" + + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + if setup_env_helper_type is None: + setup_env_helper_type = ProxDpdkVnfSetupEnvHelper + + if resource_helper_type is None: + resource_helper_type = ProxResourceHelper + + super(ProxApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) + self._result = {} + self._terminated = multiprocessing.Value('i', 0) + self._queue = multiprocessing.Value('i', 0) + + def instantiate(self, scenario_cfg, context_cfg): + LOG.info("printing .........prox instantiate ") + + self.scenario_helper.scenario_cfg = scenario_cfg + + # this won't work we need 1GB hugepages at boot + self.setup_helper.setup_vnf_environment() + + # self.connection.run("cat /proc/cpuinfo") + + prox_args, prox_path, remote_path = self.resource_helper.get_process_args() + + self.q_in = multiprocessing.Queue() + self.q_out = multiprocessing.Queue() + self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, "PROX started") + self._vnf_process = multiprocessing.Process(target=self._run_prox, + args=(remote_path, prox_path, prox_args)) + self._vnf_process.start() + + def _vnf_up_post(self): + self.resource_helper.up_post() + + def _run_prox(self, file_wrapper, config_path, prox_path, prox_args): + # This runs in a different process and should not share an SSH connection + # with the rest of the object + self.ssh_helper.drop_connection() + + time.sleep(self.WAIT_TIME) + + args = " ".join(" ".join([k, v if v else ""]) for k, v in prox_args.items()) + + cmd_template = "sudo bash -c 'cd {}; {} -o cli {} -f {} '" + prox_cmd = cmd_template.format(os.path.dirname(prox_path), prox_path, args, config_path) + + LOG.debug(prox_cmd) + self.ssh_helper.run(prox_cmd, stdin=file_wrapper, stdout=file_wrapper, + keep_stdin_open=True, pty=False) + + def vnf_execute(self, cmd, wait_time=2): + # try to execute with socket commands + self.resource_helper.execute(cmd) + + def collect_kpi(self): + if self.resource_helper is None: + result = { + "packets_in": 0, + "packets_dropped": 0, + "packets_fwd": 0, + "collect_stats": {"core": {}}, + } + return result + + if len(self.vnfd_helper.interfaces) not in {2, 4}: + raise RuntimeError("Failed ..Invalid no of ports .. " + "2 or 4 ports only supported at this time") + + port_stats = self.resource_helper.execute('port_stats', self.vnfd_helper.interfaces) + rx_total = port_stats[6] + tx_total = port_stats[7] + result = { + "packets_in": tx_total, + "packets_dropped": (tx_total - rx_total), + "packets_fwd": rx_total, + "collect_stats": self.resource_helper.collect_kpi(), + } + return result + + def _tear_down(self): + self.setup_helper.rebind_drivers() diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 90053bc36..60979ebd2 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -165,6 +165,7 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): self.bound_pci = None self._dpdk_nic_bind = None self.socket = None + self.used_drivers = None @property def dpdk_nic_bind(self): diff --git a/yardstick/network_services/vnf_generic/vnf/tg_prox.py b/yardstick/network_services/vnf_generic/vnf/tg_prox.py new file mode 100644 index 000000000..b4568bf4b --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/tg_prox.py @@ -0,0 +1,69 @@ +# Copyright (c) 2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function, absolute_import + +import logging + + +from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxDpdkVnfSetupEnvHelper +from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxResourceHelper +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen + +LOG = logging.getLogger(__name__) + + +class ProxTrafficGen(SampleVNFTrafficGen): + + PROX_MODE = "Traffic Gen" + LUA_PARAMETER_NAME = "gen" + + @staticmethod + def _sort_vpci(vnfd): + """ + + :param vnfd: vnfd.yaml + :return: trex_cfg.yaml file + """ + + def key_func(interface): + return interface["virtual-interface"]["vpci"], interface["name"] + + ext_intf = vnfd["vdu"][0]["external-interface"] + return sorted(ext_intf, key=key_func) + + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + if setup_env_helper_type is None: + setup_env_helper_type = ProxDpdkVnfSetupEnvHelper + + if resource_helper_type is None: + resource_helper_type = ProxResourceHelper + + super(ProxTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) + self._result = {} + # for some reason + self.vpci_if_name_ascending = self._sort_vpci(vnfd) + self._traffic_process = None + + def listen_traffic(self, traffic_profile): + pass + + def terminate(self): + super(ProxTrafficGen, self).terminate() + self.resource_helper.terminate() + if self._traffic_process: + self._traffic_process.terminate() + self.ssh_helper.execute("pkill prox") + self.resource_helper.rebind_drivers() |