From 1641271a83022cca9d4548839c0b54debf9c520f Mon Sep 17 00:00:00 2001 From: Christian Trautman Date: Wed, 4 May 2016 15:29:22 -0400 Subject: xena_cont: Add Xena continuous traffic functionality * Adds XenaDriver.py module to allow direct communication to Xena chassis through socket API and collect stats. * Adds implementation into xena.py for continuous traffic flow. JIRA: VSPERF-262 Change-Id: I6f975bc205e3c954215eb16466f11298e3fcdfaf Signed-off-by: Christian Trautman --- tools/pkt_gen/xena/XenaDriver.py | 1129 ++++++++++++++++++++++++++++++++++++++ tools/pkt_gen/xena/xena.py | 330 +++++++++-- 2 files changed, 1425 insertions(+), 34 deletions(-) create mode 100644 tools/pkt_gen/xena/XenaDriver.py (limited to 'tools') diff --git a/tools/pkt_gen/xena/XenaDriver.py b/tools/pkt_gen/xena/XenaDriver.py new file mode 100644 index 00000000..aa8443c9 --- /dev/null +++ b/tools/pkt_gen/xena/XenaDriver.py @@ -0,0 +1,1129 @@ +# Copyright 2016 Red Hat Inc & Xena Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This is a port of code by Xena and Flavios ported to python 3 compatibility. +# Credit given to Xena and Flavio for providing most of the logic of this code. +# The code has changes for PEP 8 and python 3 conversion. Added Stat classes +# for better scaling of future requirements. Also added calculation functions +# for line rate to align within VSPerf project. +# Flavios xena libraries available at https://github.com/fleitner/XenaPythonLib + +# Contributors: +# Flavio Leitner, Red Hat Inc. +# Dan Amzulescu, Xena Networks +# Christian Trautman, Red Hat Inc. + +""" +Xena Socket API Driver module for communicating directly with Xena system +through socket commands and returning different statistics. +""" +import locale +import logging +import socket +import struct +import sys +import threading +import time + +# Xena Socket Commands +CMD_CLEAR_RX_STATS = 'pr_clear' +CMD_CLEAR_TX_STATS = 'pt_clear' +CMD_COMMENT = ';' +CMD_CREATE_STREAM = 'ps_create' +CMD_DELETE_STREAM = 'ps_delete' +CMD_GET_PORT_SPEED = 'p_speed ?' +CMD_GET_PORT_SPEED_REDUCTION = 'p_speedreduction ?' +CMD_GET_RX_STATS_PER_TID = 'pr_tpldtraffic' +CMD_GET_STREAM_DATA = 'pt_stream' +CMD_GET_STREAMS_PER_PORT = 'ps_indices' +CMD_GET_TID_PER_STREAM = 'ps_tpldid' +CMD_GET_TX_STATS_PER_STREAM = 'pt_stream' +CMD_GET_RX_STATS = 'pr_all ?' +CMD_GET_TX_STATS = 'pt_all ?' +CMD_INTERFRAME_GAP = 'p_interframegap' +CMD_LOGIN = 'c_logon' +CMD_LOGOFF = 'c_logoff' +CMD_OWNER = 'c_owner' +CMD_PORT = ';Port:' +CMD_PORT_IP = 'p_ipaddress' +CMD_RESERVE = 'p_reservation reserve' +CMD_RELEASE = 'p_reservation release' +CMD_RELINQUISH = 'p_reservation relinquish' +CMD_RESET = 'p_reset' +CMD_SET_PORT_TIME_LIMIT = 'p_txtimelimit' +CMD_SET_STREAM_HEADER_PROTOCOL = 'ps_headerprotocol' +CMD_SET_STREAM_ON_OFF = 'ps_enable' +CMD_SET_STREAM_PACKET_HEADER = 'ps_packetheader' +CMD_SET_STREAM_PACKET_LENGTH = 'ps_packetlength' +CMD_SET_STREAM_PACKET_LIMIT = 'ps_packetlimit' +CMD_SET_STREAM_PACKET_PAYLOAD = 'ps_payload' +CMD_SET_STREAM_RATE_FRACTION = 'ps_ratefraction' +CMD_SET_STREAM_TEST_PAYLOAD_ID = 'ps_tpldid' +CMD_SET_TPLD_MODE = 'p_tpldmode' +CMD_START_TRAFFIC = 'p_traffic on' +CMD_STOP_TRAFFIC = 'p_traffic off' +CMD_STREAM_MODIFIER = 'ps_modifier' +CMD_STREAM_MODIFIER_COUNT = 'ps_modifiercount' +CMD_STREAM_MODIFIER_RANGE = 'ps_modifierrange' +CMD_VERSION = 'c_versionno ?' + +_LOCALE = locale.getlocale()[1] +_LOGGER = logging.getLogger(__name__) + + +class SimpleSocket(object): + """ + Socket class + """ + def __init__(self, hostname, port=5025, timeout=1): + """Constructor + :param hostname: hostname or ip as string + :param port: port number to use for socket as int + :param timeout: socket timeout as int + :return: SimpleSocket object + """ + self.hostname = hostname + try: + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.settimeout(timeout) + self.sock.connect((hostname, port)) + except socket.error as msg: + _LOGGER.error( + "Cannot connect to Xena Socket at %s", hostname) + _LOGGER.error("Exception : %s", msg) + sys.exit(1) + + def __del__(self): + """Deconstructor + :return: + """ + self.sock.close() + + def ask(self, cmd): + """ Send the command over the socket + :param cmd: cmd as string + :return: byte utf encoded return value from socket + """ + cmd += '\n' + try: + self.sock.send(cmd.encode('utf-8')) + return self.sock.recv(1024) + except OSError: + return '' + + def read_reply(self): + """ Get the response from the socket + :return: Return the reply + """ + reply = self.sock.recv(1024) + if reply.find("---^".encode('utf-8')) != -1: + # read again the syntax error msg + reply = self.sock.recv(1024) + return reply + + def send_command(self, cmd): + """ Send the command specified over the socket + :param cmd: Command to send as string + :return: None + """ + cmd += '\n' + self.sock.send(cmd.encode('utf-8')) + + def set_keep_alive(self): + """ Set the keep alive for the socket + :return: None + """ + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + + +class KeepAliveThread(threading.Thread): + """ + Keep alive socket class + """ + message = '' + + def __init__(self, connection, interval=10): + """ Constructor + :param connection: Socket for keep alive + :param interval: interval in seconds to send keep alive + :return: KeepAliveThread object + """ + threading.Thread.__init__(self) + self.connection = connection + self.interval = interval + self.finished = threading.Event() + self.setDaemon(True) + _LOGGER.debug( + 'Xena Socket keep alive thread initiated, interval ' + + '{} seconds'.format(self.interval)) + + def stop(self): + """ Thread stop. See python thread docs for more info + :return: None + """ + self.finished.set() + self.join() + + def run(self): + """ Thread start. See python thread docs for more info + :return: None + """ + while not self.finished.isSet(): + self.finished.wait(self.interval) + self.connection.ask(self.message) + + +class XenaSocketDriver(SimpleSocket): + """ + Xena socket class + """ + reply_ok = '' + + def __init__(self, hostname, port=22611): + """ Constructor + :param hostname: Hostname or ip as string + :param port: port to use as int + :return: XenaSocketDriver object + """ + SimpleSocket.__init__(self, hostname=hostname, port=port) + SimpleSocket.set_keep_alive(self) + self.access_semaphor = threading.Semaphore(1) + + def ask(self, cmd): + """ Send the command over the socket in a thread safe manner + :param cmd: Command to send + :return: reply from socket + """ + self.access_semaphor.acquire() + reply = SimpleSocket.ask(self, cmd) + self.access_semaphor.release() + return reply + + def ask_verify(self, cmd): + """ Send the command over the socket in a thread safe manner and + verify the response is good. + :param cmd: Command to send + :return: Boolean True if command response is good, False otherwise + """ + resp = self.ask(cmd).decode(_LOCALE).strip('\n') + _LOGGER.info('[ask_verify] %s', resp) + if resp == self.reply_ok: + return True + return False + + def disconnect(self): + """ + Close the socket connection + :return: None + """ + self.sock.close() + + def send_command(self, cmd): + """ Send the command over the socket with no return + :param cmd: Command to send + :return: None + """ + self.access_semaphor.acquire() + SimpleSocket.send_command(self, cmd) + self.access_semaphor.release() + + def send_query_replies(self, cmd): + """ Send the command over the socket and wait for all replies and return + the lines as a list + :param cmd: Command to send + :return: Response from command as list + """ + # send the command followed by cmd SYNC to find out + # when the last reply arrives. + self.send_command(cmd.strip('\n')) + self.send_command('SYNC') + replies = [] + self.access_semaphor.acquire() + msg = SimpleSocket.read_reply(self).decode(_LOCALE) + msgleft = '' + while True: + if '\n' in msg: + (reply, msgleft) = msg.split('\n', 1) + # check for syntax problems + if reply.rfind('Syntax') != -1: + self.access_semaphor.release() + return [] + + if reply.rfind('') == 0: + + self.access_semaphor.release() + return replies + + replies.append(reply + '\n') + msg = msgleft + else: + # more bytes to come + msgnew = SimpleSocket.read_reply(self).decode(_LOCALE) + msg = msgleft + msgnew + + +class XenaManager(object): + """ + Manager class for port and socket functions + """ + def __init__(self, socketDriver, user='', password='xena'): + """Constructor + + Establish a connection to Xena using a ``driver`` with the ``password`` + supplied. + + Attributes: + :param socketDriver: XenaSocketDriver connection object + :param password: Password to the Xena traffic generator + :returns: XenaManager object + """ + self.driver = socketDriver + self.ports = list() + self.keep_alive_thread = KeepAliveThread(self.driver) + + if self.logon(password): + _LOGGER.info('Connected to Xena at %s', self.driver.hostname) + else: + _LOGGER.error('Failed to logon to Xena at %s', self.driver.hostname) + return + + self.set_owner(user) + + def disconnect(self): + """ Release ports and disconnect from chassis. + """ + for module_port in self.ports: + module_port.release_port() + self.ports = [] + self.logoff() + self.keep_alive_thread.stop() + + def add_module_port(self, module, port): + """Factory for Xena Ports + + :param module: String or int of module + :param port: String or int of port + :return: XenaPort object if success, None if port already added + """ + xenaport = XenaPort(self, module, port) + if xenaport in self.ports: + return None + else: + self.ports.append(xenaport) + return xenaport + + def get_module_port(self, module, port): + """Return the Xena Port object if available + :param module: module number as int or str + :param port: port number as int or str + :return: XenaPort object or None if not found + """ + for por in self.ports: + if por.port == str(port) and por.module == str(module): + return por + return None + + def get_version(self): + """ + Get the version from the chassis + :return: versions of server and driver as string + """ + res = self.driver.ask(make_manager_command( + CMD_VERSION, '')).decode(_LOCALE) + res = res.rstrip('\n').split() + return "Server: {} Driver: {}".format(res[1], res[2]) + + def logoff(self): + """ + Logoff from the Xena chassis + :return: Boolean True if response OK, False if error. + """ + return self.driver.ask_verify(make_manager_command(CMD_LOGOFF)) + + def logon(self, password): + """Login to the Xena traffic generator using the ``password`` supplied. + + :param password: string of password + :return: Boolean True if response OK, False if error. + """ + self.keep_alive_thread.start() + return self.driver.ask_verify(make_manager_command(CMD_LOGIN, password)) + + def set_owner(self, username): + """Set the ports owner. + :return: Boolean True if response OK, False if error. + """ + return self.driver.ask_verify(make_manager_command(CMD_OWNER, username)) + + +class XenaPort(object): + """ + Xena Port emulator class + """ + def __init__(self, manager, module, port): + """Constructor + + :param manager: XenaManager object + :param module: Module as string or int of module to use + :param port: Port as string or int of port to use + :return: XenaPort object + """ + self._manager = manager + self._module = str(module) + self._port = str(port) + self._streams = list() + + @property + def manager(self): + """Property for manager attribute + :return: manager object + """ + return self._manager + + @property + def module(self): + """Property for module attribute + :return: module value as string + """ + return self._module + + @property + def port(self): + """Property for port attribute + :return: port value as string + """ + return self._port + + def port_string(self): + """String builder with attributes + :return: String of module port for command sequence + """ + stringify = "{}/{}".format(self._module, self._port) + return stringify + + def add_stream(self): + """Add a stream to the port. + :return: XenaStream object, None if failure + """ + identifier = len(self._streams) + stream = XenaStream(self, identifier) + if self._manager.driver.ask_verify(make_stream_command( + CMD_CREATE_STREAM, '', stream)): + self._streams.append(stream) + return stream + else: + _LOGGER.error("Error during stream creation") + return None + + def clear_stats(self, rx_clear=True, tx_clear=True): + """Clear the port stats + + :param rx_clear: Boolean if rx stats are to be cleared + :param tx_clear: Boolean if tx stats are to be cleared + :return: Boolean True if response OK, False if error. + """ + command = make_port_command(CMD_CLEAR_RX_STATS, self) + res1 = self._manager.driver.ask_verify(command) if rx_clear else True + command = make_port_command(CMD_CLEAR_TX_STATS, self) + res2 = self._manager.driver.ask_verify(command) if tx_clear else True + return all([res1, res2]) + + def get_effective_speed(self): + """ + Get the effective speed on the port + :return: effective speed as float + """ + port_speed = self.get_port_speed() + reduction = self.get_port_speed_reduction() + effective_speed = port_speed * (1.0 - reduction / 1000000.0) + return effective_speed + + def get_inter_frame_gap(self): + """ + Get the interframe gap and return it as string + :return: integer of interframe gap + """ + command = make_port_command(CMD_INTERFRAME_GAP + '?', self) + res = self._manager.driver.ask(command).decode(_LOCALE) + res = int(res.rstrip('\n').split(' ')[-1]) + return res + + def get_port_speed(self): + """ + Get the port speed as bits from port and return it as a int. + :return: Int of port speed + """ + command = make_port_command(CMD_GET_PORT_SPEED, self) + res = self._manager.driver.ask(command).decode(_LOCALE) + port_speed = res.split(' ')[-1].rstrip('\n') + return int(port_speed) * 1000000 + + def get_port_speed_reduction(self): + """ + Get the port speed reduction value as int + :return: Integer of port speed reduction value + """ + command = make_port_command(CMD_GET_PORT_SPEED_REDUCTION, self) + res = self._manager.driver.ask(command).decode(_LOCALE) + res = int(res.rstrip('\n').split(' ')[-1]) + return res + + def get_rx_stats(self): + """Get the rx stats and return the data as a dict. + :return: Receive stats as dictionary + """ + command = make_port_command(CMD_GET_RX_STATS, self) + rx_data = self._manager.driver.send_query_replies(command) + data = XenaRXStats(rx_data, time.time()) + return data + + def get_tx_stats(self): + """Get the tx stats and return the data as a dict. + :return: Receive stats as dictionary + """ + command = make_port_command(CMD_GET_TX_STATS, self) + tx_data = self._manager.driver.send_query_replies(command) + data = XenaTXStats(tx_data, time.time()) + return data + + def micro_tpld_disable(self): + """Disable micro TPLD and return to standard payload size + :return: Boolean if response OK, False if error + """ + command = make_port_command(CMD_SET_TPLD_MODE + ' normal', self) + return self._manager.driver.ask_verify(command) + + def micro_tpld_enable(self): + """Enable micro TPLD 6 byte payloads. + :Return Boolean if response OK, False if error + """ + command = make_port_command(CMD_SET_TPLD_MODE + ' micro', self) + return self._manager.driver.ask_verify(command) + + def release_port(self): + """Release the port + :return: Boolean True if response OK, False if error. + """ + command = make_port_command(CMD_RELEASE, self) + return self._manager.driver.ask_verify(command) + + def reserve_port(self): + """Reserve the port + :return: Boolean True if response OK, False if error. + """ + command = make_port_command(CMD_RESERVE, self) + return self._manager.driver.ask_verify(command) + + def reset_port(self): + """Reset the port + :return: Boolean True if response OK, False if error. + """ + command = make_port_command(CMD_RESET, self) + return self._manager.driver.ask_verify(command) + + def set_port_ip(self, ip_addr, cidr, gateway, wild='255'): + """ + Set the port ip address of the specific port + :param ip_addr: IP address to set to port + :param cidr: cidr number for the subnet + :param gateway: Gateway ip for port + :param wild: wildcard used for ARP and PING replies + :return: Boolean True if response OK, False if error + """ + # convert the cidr to a dot notation subnet address + subnet = socket.inet_ntoa( + struct.pack(">I", (0xffffffff << (32 - cidr)) & 0xffffffff)) + + command = make_port_command('{} {} {} {} 0.0.0.{}'.format( + CMD_PORT_IP, ip_addr, subnet, gateway, wild), self) + return self._manager.driver.ask_verify(command) + + def set_port_time_limit(self, micro_seconds): + """Set the port time limit in ms + :param micro_seconds: ms for port time limit + :return: Boolean True if response OK, False if error. + """ + command = make_port_command('{} {}'.format( + CMD_SET_PORT_TIME_LIMIT, micro_seconds), self) + return self._manager.driver.ask_verify(command) + + def traffic_off(self): + """Start traffic + :return: Boolean True if response OK, False if error. + """ + command = make_port_command(CMD_STOP_TRAFFIC, self) + return self._manager.driver.ask_verify(command) + + def traffic_on(self): + """Stop traffic + :return: Boolean True if response OK, False if error. + """ + command = make_port_command(CMD_START_TRAFFIC, self) + return self._manager.driver.ask_verify(command) + + +class XenaStream(object): + """ + Xena stream emulator class + """ + def __init__(self, xenaPort, streamID): + """Constructor + + :param xenaPort: XenaPort object + :param streamID: Stream ID as int or string + :return: XenaStream object + """ + self._xena_port = xenaPort + self._stream_id = str(streamID) + self._manager = self._xena_port.manager + self._header_protocol = None + + @property + def xena_port(self): + """Property for port attribute + :return: XenaPort object + """ + return self._xena_port + + @property + def stream_id(self): + """Property for streamID attribute + :return: streamID value as string + """ + return self._stream_id + + def enable_multistream(self, flows, layer): + """ + Basic implementation of multi stream. Enable multi stream by setting + modifiers on the stream + :param flows: Numbers of flows or end range + :param layer: layer to enable multi stream as str. Acceptable values + are L2, L3, or L4 + :return: True if success False otherwise + """ + if not self._header_protocol: + raise RuntimeError( + "Please set a protocol header before calling this method.") + + # byte offsets for setting the modifier + offsets = { + 'L2': [0, 6], + 'L3': [32, 36] if 'VLAN' in self._header_protocol else [28, 32], + 'L4': [38, 40] if 'VLAN' in self._header_protocol else [34, 36] + } + + responses = list() + if layer in offsets.keys() and flows > 0: + command = make_port_command( + CMD_STREAM_MODIFIER_COUNT + ' [{}]'.format(self._stream_id) + + ' 2', self._xena_port) + responses.append(self._manager.driver.ask_verify(command)) + command = make_port_command( + CMD_STREAM_MODIFIER + ' [{},0] {} 0xFFFF0000 INC 1'.format( + self._stream_id, offsets[layer][0]), self._xena_port) + responses.append(self._manager.driver.ask_verify(command)) + command = make_port_command( + CMD_STREAM_MODIFIER_RANGE + ' [{},0] 0 1 {}'.format( + self._stream_id, flows), self._xena_port) + responses.append(self._manager.driver.ask_verify(command)) + command = make_port_command( + CMD_STREAM_MODIFIER + ' [{},1] {} 0xFFFF0000 INC 1'.format( + self._stream_id, offsets[layer][1]), self._xena_port) + responses.append(self._manager.driver.ask_verify(command)) + command = make_port_command( + CMD_STREAM_MODIFIER_RANGE + ' [{},1] 0 1 {}'.format( + self._stream_id, flows), self._xena_port) + responses.append(self._manager.driver.ask_verify(command)) + return all(responses) # return True if they all worked + elif flows < 1: + _LOGGER.warning( + 'No flows specified in enable multistream. Bypassing...') + return False + else: + raise NotImplementedError( + "Non-implemented stream layer in method enable multistream ", + "layer=", layer) + + def get_stream_data(self): + """ + Get the response for stream data + :return: String of response for stream data info + """ + command = make_stream_command(CMD_GET_STREAM_DATA, '?', self) + res = self._manager.driver.ask(command).decode(_LOCALE) + return res + + def set_header_protocol(self, protocol_header): + """Set the header info for the packet header hex. + If the packet header contains just Ethernet and IP info then call this + method with ETHERNET IP as the protocol header. + + :param protocol_header: protocol header argument + :return: Boolean True if success, False if error + """ + command = make_stream_command( + CMD_SET_STREAM_HEADER_PROTOCOL, + protocol_header, self) + if self._manager.driver.ask_verify(command): + self._header_protocol = protocol_header + return True + else: + return False + + def set_off(self): + """Set the stream to off + :return: Boolean True if success, False if error + """ + return self._manager.driver.ask_verify(make_stream_command( + CMD_SET_STREAM_ON_OFF, 'off', self)) + + def set_on(self): + """Set the stream to on + :return: Boolean True if success, False if error + """ + return self._manager.driver.ask_verify(make_stream_command( + CMD_SET_STREAM_ON_OFF, 'on', self)) + + def set_packet_header(self, header): + """Set the stream packet header + + :param header: packet header as hex bytes + :return: Boolean True if success, False if error + """ + return self._manager.driver.ask_verify(make_stream_command( + CMD_SET_STREAM_PACKET_HEADER, header, self)) + + def set_packet_length(self, pattern_type, minimum, maximum): + """Set the pattern length with min and max values based on the pattern + type supplied + + :param pattern_type: String of pattern type, valid entries [ fixed, + butterfly, random, mix, incrementing ] + :param minimum: integer of minimum byte value + :param maximum: integer of maximum byte value + :return: Boolean True if success, False if error + """ + return self._manager.driver.ask_verify(make_stream_command( + CMD_SET_STREAM_PACKET_LENGTH, '{} {} {}'.format( + pattern_type, minimum, maximum), self)) + + def set_packet_limit(self, limit): + """Set the packet limit + + :param limit: number of packets that will be sent, use -1 to disable + :return: Boolean True if success, False if error + """ + return self._manager.driver.ask_verify(make_stream_command( + CMD_SET_STREAM_PACKET_LIMIT, limit, self)) + + def set_packet_payload(self, payload_type, hex_value): + """Set the payload to the hex value based on the payload type + + :param payload_type: string of the payload type, valid entries [ pattern, + incrementing, prbs ] + :param hex_value: hex string of valid hex + :return: Boolean True if success, False if error + """ + return self._manager.driver.ask_verify(make_stream_command( + CMD_SET_STREAM_PACKET_PAYLOAD, '{} {}'.format( + payload_type, hex_value), self)) + + def set_rate_fraction(self, fraction): + """Set the rate fraction + + :param fraction: fraction for the stream + :return: Boolean True if success, False if error + """ + return self._manager.driver.ask_verify(make_stream_command( + CMD_SET_STREAM_RATE_FRACTION, fraction, self)) + + def set_payload_id(self, identifier): + """ Set the test payload ID + :param identifier: ID as int or string + :return: Boolean True if success, False if error + """ + return self._manager.driver.ask_verify(make_stream_command( + CMD_SET_STREAM_TEST_PAYLOAD_ID, identifier, self)) + + +class XenaRXStats(object): + """ + Receive stat class + """ + def __init__(self, stats, epoc): + """ Constructor + :param stats: Stats from pr all command as list + :param epoc: Current time in epoc + :return: XenaRXStats object + """ + self._stats = stats + self._time = epoc + self.data = self.parse_stats() + self.preamble = 8 + + @staticmethod + def _pack_stats(param, start, fields=None): + """ Pack up the list of stats in a dictionary + :param param: The list of params to process + :param start: What element to start at + :param fields: The field names to pack as keys + :return: Dictionary of data where fields match up to the params + """ + if not fields: + fields = ['bps', 'pps', 'bytes', 'packets'] + data = {} + i = 0 + for column in fields: + data[column] = int(param[start + i]) + i += 1 + + return data + + @staticmethod + def _pack_tplds_stats(param, start): + """ Pack up the tplds stats + :param param: List of params to pack + :param start: What element to start at + :return: Dictionary of stats + """ + data = {} + i = 0 + for val in range(start, len(param) - start): + data[i] = int(param[val]) + i += 1 + return data + + def _pack_rxextra_stats(self, param, start): + """ Pack up the extra stats + :param param: List of params to pack + :param start: What element to start at + :return: Dictionary of stats + """ + fields = ['fcserrors', 'pauseframes', 'arprequests', 'arpreplies', + 'pingrequests', 'pingreplies', 'gapcount', 'gapduration'] + return self._pack_stats(param, start, fields) + + def _pack_tplderrors_stats(self, param, start): + """ Pack up tlpd errors + :param param: List of params to pack + :param start: What element to start at + :return: Dictionary of stats + """ + fields = ['dummy', 'seq', 'mis', 'pld'] + return self._pack_stats(param, start, fields) + + def _pack_tpldlatency_stats(self, param, start): + """ Pack up the tpld latency stats + :param param: List of params to pack + :param start: What element to start at + :return: Dictionary of stats + """ + fields = ['min', 'avg', 'max', '1sec'] + return self._pack_stats(param, start, fields) + + def _pack_tpldjitter_stats(self, param, start): + """ Pack up the tpld jitter stats + :param param: List of params to pack + :param start: What element to start at + :return: Dictionary of stats + """ + fields = ['min', 'avg', 'max', '1sec'] + return self._pack_stats(param, start, fields) + + @property + def time(self): + """ + :return: Time as String of epoc of when stats were collected + """ + return self._time + + def parse_stats(self): + """ Parse the stats from pr all command + :return: Dictionary of all stats + """ + statdict = {} + for line in self._stats: + param = line.split() + if param[1] == 'PR_TOTAL': + statdict['pr_total'] = self._pack_stats(param, 2) + elif param[1] == 'PR_NOTPLD': + statdict['pr_notpld'] = self._pack_stats(param, 2,) + elif param[1] == 'PR_EXTRA': + statdict['pr_extra'] = self._pack_rxextra_stats(param, 2) + elif param[1] == 'PT_STREAM': + entry_id = "pt_stream_%s" % param[2].strip('[]') + statdict[entry_id] = self._pack_stats(param, 3) + elif param[1] == 'PR_TPLDS': + tid_list = self._pack_tplds_stats(param, 2) + if len(tid_list): + statdict['pr_tplds'] = tid_list + elif param[1] == 'PR_TPLDTRAFFIC': + if 'pr_tpldstraffic' in statdict: + data = statdict['pr_tpldstraffic'] + else: + data = {} + entry_id = param[2].strip('[]') + data[entry_id] = self._pack_stats(param, 3) + statdict['pr_tpldstraffic'] = data + elif param[1] == 'PR_TPLDERRORS': + if 'pr_tplderrors' in statdict: + data = statdict['pr_tplderrors'] + else: + data = {} + entry_id = param[2].strip('[]') + data[entry_id] = self._pack_tplderrors_stats(param, 3) + statdict['pr_tplderrors'] = data + elif param[1] == 'PR_TPLDLATENCY': + if 'pr_tpldlatency' in statdict: + data = statdict['pr_tpldlatency'] + else: + data = {} + entry_id = param[2].strip('[]') + data[entry_id] = self._pack_tpldlatency_stats(param, 3) + statdict['pr_tpldlatency'] = data + elif param[1] == 'PR_TPLDJITTER': + if 'pr_tpldjitter' in statdict: + data = statdict['pr_tpldjitter'] + else: + data = {} + entry_id = param[2].strip('[]') + data[entry_id] = self._pack_tpldjitter_stats(param, 3) + statdict['pr_pldjitter'] = data + elif param[1] == 'PR_FILTER': + if 'pr_filter' in statdict: + data = statdict['pr_filter'] + else: + data = {} + entry_id = param[2].strip('[]') + data[entry_id] = self._pack_stats(param, 3) + statdict['pr_filter'] = data + elif param[1] == 'P_RECEIVESYNC': + if param[2] == 'IN_SYNC': + statdict['p_receivesync'] = {'IN SYNC': 'True'} + else: + statdict['p_receivesync'] = {'IN SYNC': 'False'} + else: + logging.warning("XenaPort: unknown stats: %s", param[1]) + + mydict = statdict + return mydict + + +class XenaTXStats(object): + """ + Xena transmit stat class + """ + def __init__(self, stats, epoc): + """ Constructor + :param stats: Stats from pt all command as list + :param epoc: Current time in epoc + :return: XenaTXStats object + """ + self._stats = stats + self._time = epoc + self._ptstreamkeys = list() + self.data = self.parse_stats() + self.preamble = 8 + + @staticmethod + def _pack_stats(params, start, fields=None): + """ Pack up the list of stats in a dictionary + :param params: The list of params to process + :param start: What element to start at + :param fields: The field names to pack as keys + :return: Dictionary of data where fields match up to the params + """ + if not fields: + fields = ['bps', 'pps', 'bytes', 'packets'] + data = {} + i = 0 + for column in fields: + data[column] = int(params[start + i]) + i += 1 + + return data + + def _pack_txextra_stats(self, params, start): + """ Pack up the tx extra stats + :param params: List of params to pack + :param start: What element to start at + :return: Dictionary of stats + """ + fields = ['arprequests', 'arpreplies', 'pingrequests', 'pingreplies', + 'injectedfcs', 'injectedseq', 'injectedmis', 'injectedint', + 'injectedtid', 'training'] + return self._pack_stats(params, start, fields) + + @property + def pt_stream_keys(self): + """ + :return: Return a list of pt_stream_x stream key ids + """ + return self._ptstreamkeys + + @property + def time(self): + """ + :return: Time as String of epoc of when stats were collected + """ + return self._time + + def parse_stats(self): + """ Parse the stats from pr all command + :return: Dictionary of all stats + """ + statdict = {} + for line in self._stats: + param = line.split() + if param[1] == 'PT_TOTAL': + statdict['pt_total'] = self._pack_stats(param, 2) + elif param[1] == 'PT_NOTPLD': + statdict['pt_notpld'] = self._pack_stats(param, 2,) + elif param[1] == 'PT_EXTRA': + statdict['pt_extra'] = self._pack_txextra_stats(param, 2) + elif param[1] == 'PT_STREAM': + entry_id = "pt_stream_%s" % param[2].strip('[]') + self._ptstreamkeys.append(entry_id) + statdict[entry_id] = self._pack_stats(param, 3) + else: + logging.warning("XenaPort: unknown stats: %s", param[1]) + mydict = statdict + return mydict + + +def aggregate_stats(stat1, stat2): + """ + Recursive function to aggregate two sets of statistics. This is used when + bi directional traffic is done and statistics need to be calculated based + on two sets of statistics. + :param stat1: One set of dictionary stats from RX or TX stats + :param stat2: Second set of dictionary stats from RX or TX stats + :return: stats for data entry in RX or TX Stats instance + """ + newstat = dict() + for (keys1, keys2) in zip(stat1.keys(), stat2.keys()): + if isinstance(stat1[keys1], dict): + newstat[keys1] = aggregate_stats(stat1[keys1], stat2[keys2]) + else: + if not isinstance(stat1[keys1], int) and not isinstance( + [keys1], float): + # its some value we don't need to aggregate + return stat1[keys1] + # for latency stats do the appropriate calculation + if keys1 == 'max': + newstat[keys1] = max(stat1[keys1], stat2[keys2]) + elif keys1 == 'min': + newstat[keys1] = min(stat1[keys1], stat2[keys2]) + elif keys1 == 'avg': + newstat[keys1] = (stat1[keys1] + stat2[keys2]) / 2 + else: + newstat[keys1] = (stat1[keys1] + stat2[keys2]) + return newstat + + +def line_percentage(port, stats, time_active, packet_size): + """ + Calculate the line percentage rate from the duration, port object and stat + object. + :param port: XenaPort object + :param stats: Xena RXStat or TXStat object + :param time_active: time the stream was active in secs as int + :param packet_size: packet size as int + :return: line percentage as float + """ + # this is ugly, but its prettier than calling the get method 3 times... + try: + packets = stats.data['pr_total']['packets'] + except KeyError: + try: + packets = stats.data['pt_total']['packets'] + except KeyError: + _LOGGER.error( + 'Could not calculate line rate because packet stat not found.') + return 0 + ifg = port.get_inter_frame_gap() + pps = packets_per_second(packets, time_active) + l2br = l2_bit_rate(packet_size, stats.preamble, pps) + l1br = l1_bit_rate(l2br, pps, ifg, stats.preamble) + return 100.0 * l1br / port.get_effective_speed() + + +def l2_bit_rate(packet_size, preamble, pps): + """ + Return the l2 bit rate + :param packet_size: packet size on the line in bytes + :param preamble: preamble size of the packet header in bytes + :param pps: packets per second + :return: l2 bit rate as float + """ + return (packet_size * preamble) * pps + + +def l1_bit_rate(l2br, pps, ifg, preamble): + """ + Return the l1 bit rate + :param l2br: l2 bit rate int bits per second + :param pps: packets per second + :param ifg: the inter frame gap + :param preamble: preamble size of the packet header in bytes + :return: l1 bit rate as float + """ + return l2br + (pps * ifg * preamble) + + +def make_manager_command(cmd, argument=None): + """ String builder for Xena socket commands + + :param cmd: Command to send + :param argument: Arguments for command to send + :return: String of command + """ + command = '{} "{}"'.format(cmd, argument) if argument else cmd + _LOGGER.info("[Command Sent] : %s", command) + return command + + +def make_port_command(cmd, xena_port): + """ String builder for Xena port commands + + :param cmd: Command to send + :param xena_port: XenaPort object + :return: String of command + """ + command = "{} {}".format(xena_port.port_string(), cmd) + _LOGGER.info("[Command Sent] : %s", command) + return command + + +def make_stream_command(cmd, args, xena_stream): + """ String builder for Xena port commands + + :param cmd: Command to send + :param xena_stream: XenaStream object + :return: String of command + """ + command = "{} {} [{}] {}".format(xena_stream.xena_port.port_string(), cmd, + xena_stream.stream_id, args) + _LOGGER.info("[Command Sent] : %s", command) + return command + + +def packets_per_second(packets, time_in_sec): + """ + Return the pps as float + :param packets: total packets + :param time_in_sec: time in seconds + :return: float of pps + """ + return packets / time_in_sec diff --git a/tools/pkt_gen/xena/xena.py b/tools/pkt_gen/xena/xena.py index d29fc362..99da4584 100755 --- a/tools/pkt_gen/xena/xena.py +++ b/tools/pkt_gen/xena/xena.py @@ -23,6 +23,7 @@ Xena Traffic Generator Model """ # python imports +import binascii import logging import subprocess import sys @@ -40,6 +41,15 @@ from tools.pkt_gen.trafficgen.trafficgen import ITrafficGenerator # Xena module imports from tools.pkt_gen.xena.xena_json import XenaJSON +from tools.pkt_gen.xena.XenaDriver import ( + aggregate_stats, + line_percentage, + XenaSocketDriver, + XenaManager, + ) + +# scapy imports +import scapy.layers.inet as inet class Xena(ITrafficGenerator): @@ -51,8 +61,12 @@ class Xena(ITrafficGenerator): def __init__(self): self.mono_pipe = None + self.xmanager = None self._params = {} + self._xsocket = None self._duration = None + self.tx_stats = None + self.rx_stats = None @property def traffic_defaults(self): @@ -127,6 +141,125 @@ class Xena(ITrafficGenerator): return results + def _build_packet_header(self, reverse=False): + """ + Build a packet header based on traffic profile using scapy external + libraries. + :param reverse: Swap source and destination info when building header + :return: packet header in hex + """ + srcmac = self._params['traffic']['l2'][ + 'srcmac'] if not reverse else self._params['traffic']['l2'][ + 'dstmac'] + dstmac = self._params['traffic']['l2'][ + 'dstmac'] if not reverse else self._params['traffic']['l2'][ + 'srcmac'] + srcip = self._params['traffic']['l3'][ + 'srcip'] if not reverse else self._params['traffic']['l3']['dstip'] + dstip = self._params['traffic']['l3'][ + 'dstip'] if not reverse else self._params['traffic']['l3']['srcip'] + layer2 = inet.Ether(src=srcmac, dst=dstmac) + layer3 = inet.IP(src=srcip, dst=dstip, + proto=self._params['traffic']['l3']['proto']) + layer4 = inet.UDP(sport=self._params['traffic']['l4']['srcport'], + dport=self._params['traffic']['l4']['dstport']) + if self._params['traffic']['vlan']['enabled']: + vlan = inet.Dot1Q(vlan=self._params['traffic']['vlan']['id'], + prio=self._params['traffic']['vlan']['priority'], + id=self._params['traffic']['vlan']['cfi']) + else: + vlan = None + packet = layer2/vlan/layer3/layer4 if vlan else layer2/layer3/layer4 + packet_bytes = bytes(packet) + packet_hex = '0x' + binascii.hexlify(packet_bytes).decode('utf-8') + return packet_hex + + def _create_api_result(self): + """ + Create result dictionary per trafficgen specifications from socket API + stats. If stats are not available return values of 0. + :return: ResultsConstants as dictionary + """ + # Handle each case of statistics based on if the data is available. + # This prevents uncaught exceptions when the stats aren't available. + result_dict = OrderedDict() + if self.tx_stats.data.get(self.tx_stats.pt_stream_keys[0]): + result_dict[ResultsConstants.TX_FRAMES] = self.tx_stats.data[ + self.tx_stats.pt_stream_keys[0]]['packets'] + result_dict[ResultsConstants.TX_RATE_FPS] = self.tx_stats.data[ + self.tx_stats.pt_stream_keys[0]]['pps'] + result_dict[ResultsConstants.TX_RATE_MBPS] = self.tx_stats.data[ + self.tx_stats.pt_stream_keys[0]]['bps'] / 1000000 + result_dict[ResultsConstants.TX_BYTES] = self.tx_stats.data[ + self.tx_stats.pt_stream_keys[0]]['bytes'] + # tx rate percent may need to be halved if bi directional + result_dict[ResultsConstants.TX_RATE_PERCENT] = line_percentage( + self.xmanager.ports[0], self.tx_stats, self._duration, + self._params['traffic']['l2']['framesize']) if \ + self._params['traffic']['bidir'] == 'False' else\ + line_percentage( + self.xmanager.ports[0], self.tx_stats, self._duration, + self._params['traffic']['l2']['framesize']) / 2 + else: + self._logger.error('Transmit stats not available.') + result_dict[ResultsConstants.TX_FRAMES] = 0 + result_dict[ResultsConstants.TX_RATE_FPS] = 0 + result_dict[ResultsConstants.TX_RATE_MBPS] = 0 + result_dict[ResultsConstants.TX_BYTES] = 0 + result_dict[ResultsConstants.TX_RATE_PERCENT] = 0 + + if self.rx_stats.data.get('pr_tpldstraffic'): + result_dict[ResultsConstants.RX_FRAMES] = self.rx_stats.data[ + 'pr_tpldstraffic']['0']['packets'] + result_dict[ + ResultsConstants.THROUGHPUT_RX_FPS] = self.rx_stats.data[ + 'pr_tpldstraffic']['0']['pps'] + result_dict[ + ResultsConstants.THROUGHPUT_RX_MBPS] = self.rx_stats.data[ + 'pr_tpldstraffic']['0']['bps'] / 1000000 + result_dict[ResultsConstants.RX_BYTES] = self.rx_stats.data[ + 'pr_tpldstraffic']['0']['bytes'] + # throughput percent may need to be halved if bi directional + result_dict[ + ResultsConstants.THROUGHPUT_RX_PERCENT] = line_percentage( + self.xmanager.ports[1], self.rx_stats, self._duration, + self._params['traffic']['l2']['framesize']) if \ + self._params['traffic']['bidir'] == 'False' else \ + line_percentage( + self.xmanager.ports[1], self.rx_stats, self._duration, + self._params['traffic']['l2']['framesize']) / 2 + + else: + self._logger.error('Receive stats not available.') + result_dict[ResultsConstants.RX_FRAMES] = 0 + result_dict[ResultsConstants.THROUGHPUT_RX_FPS] = 0 + result_dict[ResultsConstants.THROUGHPUT_RX_MBPS] = 0 + result_dict[ResultsConstants.RX_BYTES] = 0 + result_dict[ResultsConstants.THROUGHPUT_RX_PERCENT] = 0 + + if self.rx_stats.data.get('pr_tplderrors'): + result_dict[ResultsConstants.PAYLOAD_ERR] = self.rx_stats.data[ + 'pr_tplderrors']['0']['pld'] + result_dict[ResultsConstants.SEQ_ERR] = self.rx_stats.data[ + 'pr_tplderrors']['0']['seq'] + else: + result_dict[ResultsConstants.PAYLOAD_ERR] = 0 + result_dict[ResultsConstants.SEQ_ERR] = 0 + + if self.rx_stats.data.get('pr_tpldlatency'): + result_dict[ResultsConstants.MIN_LATENCY_NS] = self.rx_stats.data[ + 'pr_tpldlatency']['0']['min'] + result_dict[ResultsConstants.MAX_LATENCY_NS] = self.rx_stats.data[ + 'pr_tpldlatency']['0']['max'] + result_dict[ResultsConstants.AVG_LATENCY_NS] = self.rx_stats.data[ + 'pr_tpldlatency']['0']['avg'] + else: + result_dict[ResultsConstants.MIN_LATENCY_NS] = 0 + result_dict[ResultsConstants.MAX_LATENCY_NS] = 0 + result_dict[ResultsConstants.AVG_LATENCY_NS] = 0 + + return result_dict + def _setup_json_config(self, trials, loss_rate, testtype=None): """ Create a 2bUsed json file that will be used for xena2544.exe execution. @@ -192,31 +325,160 @@ class Xena(ITrafficGenerator): self._logger.exception("Error during Xena JSON setup: %s", exc) raise - def connect(self): - """Connect to the traffic generator. + def _start_traffic_api(self, packet_limit): + """ + Start the Xena traffic using the socket API driver + :param packet_limit: packet limit for stream, set to -1 for no limit + :return: None + """ + if not self.xmanager: + self._xsocket = XenaSocketDriver( + settings.getValue('TRAFFICGEN_XENA_IP')) + self.xmanager = XenaManager( + self._xsocket, settings.getValue('TRAFFICGEN_XENA_USER'), + settings.getValue('TRAFFICGEN_XENA_PASSWORD')) + + # for the report file version info ask the chassis directly for its + # software versions + settings.setValue('XENA_VERSION', 'XENA Socket API - {}'.format( + self.xmanager.get_version())) + + if not len(self.xmanager.ports): + self.xmanager.ports[0] = self.xmanager.add_module_port( + settings.getValue('TRAFFICGEN_XENA_MODULE1'), + settings.getValue('TRAFFICGEN_XENA_PORT1')) + if not self.xmanager.ports[0].reserve_port(): + self._logger.error( + 'Unable to reserve port 0. Please release Xena Port') + + if len(self.xmanager.ports) < 2: + self.xmanager.ports[1] = self.xmanager.add_module_port( + settings.getValue('TRAFFICGEN_XENA_MODULE2'), + settings.getValue('TRAFFICGEN_XENA_PORT2')) + if not self.xmanager.ports[1].reserve_port(): + self._logger.error( + 'Unable to reserve port 1. Please release Xena Port') + + # Clear port configuration for a clean start + self.xmanager.ports[0].reset_port() + self.xmanager.ports[1].reset_port() + self.xmanager.ports[0].clear_stats() + self.xmanager.ports[1].clear_stats() + + # set the port IP from the conf file + self.xmanager.ports[0].set_port_ip( + settings.getValue('TRAFFICGEN_XENA_PORT0_IP'), + settings.getValue('TRAFFICGEN_XENA_PORT0_CIDR'), + settings.getValue('TRAFFICGEN_XENA_PORT0_GATEWAY')) + self.xmanager.ports[1].set_port_ip( + settings.getValue('TRAFFICGEN_XENA_PORT1_IP'), + settings.getValue('TRAFFICGEN_XENA_PORT1_CIDR'), + settings.getValue('TRAFFICGEN_XENA_PORT1_GATEWAY')) + + def setup_stream(stream, port, payload_id, flip_addr=False): + """ + Helper function to configure streams. + :param stream: Stream object from XenaDriver module + :param port: Port object from XenaDriver module + :param payload_id: payload ID as int + :param flip_addr: Boolean if the source and destination addresses + should be flipped. + :return: None + """ + stream.set_on() + stream.set_packet_limit(packet_limit) + + stream.set_rate_fraction( + 10000 * self._params['traffic']['frame_rate']) + stream.set_packet_header(self._build_packet_header( + reverse=flip_addr)) + stream.set_header_protocol( + 'ETHERNET VLAN IP UDP' if self._params['traffic']['vlan'][ + 'enabled'] else 'ETHERNET IP UDP') + stream.set_packet_length( + 'fixed', self._params['traffic']['l2']['framesize'], 16383) + stream.set_packet_payload('incrementing', '0x00') + stream.set_payload_id(payload_id) + port.set_port_time_limit(self._duration * 1000000) + + if self._params['traffic']['l2']['framesize'] == 64: + # set micro tpld + port.micro_tpld_enable() + + if self._params['traffic']['multistream']: + stream.enable_multistream( + flows=self._params['traffic']['multistream'], + layer=self._params['traffic']['stream_type']) + + s1_p0 = self.xmanager.ports[0].add_stream() + setup_stream(s1_p0, self.xmanager.ports[0], 0) + + if self._params['traffic']['bidir'] == 'True': + s1_p1 = self.xmanager.ports[1].add_stream() + setup_stream(s1_p1, self.xmanager.ports[1], 1, flip_addr=True) + + if not self.xmanager.ports[0].traffic_on(): + self._logger.error( + "Failure to start port 0. Check settings and retry.") + if self._params['traffic']['bidir'] == 'True': + if not self.xmanager.ports[1].traffic_on(): + self._logger.error( + "Failure to start port 1. Check settings and retry.") + sleep(self._duration) + # getting results + if self._params['traffic']['bidir'] == 'True': + # need to aggregate out both ports stats and assign that data + self.rx_stats = self.xmanager.ports[1].get_rx_stats() + self.tx_stats = self.xmanager.ports[0].get_tx_stats() + self.tx_stats.data = aggregate_stats( + self.tx_stats.data, + self.xmanager.ports[1].get_tx_stats().data) + self.rx_stats.data = aggregate_stats( + self.rx_stats.data, + self.xmanager.ports[0].get_rx_stats().data) + else: + # no need to aggregate, just grab the appropriate port stats + self.tx_stats = self.xmanager.ports[0].get_tx_stats() + self.rx_stats = self.xmanager.ports[1].get_rx_stats() + sleep(1) - This is an optional function, designed for traffic generators - which must be "connected to" (i.e. via SSH or an API) before - they can be used. If not required, simply do nothing here. + def _stop_api_traffic(self): + """ + Stop traffic through the socket API + :return: Return results from _create_api_result method + """ + self.xmanager.ports[0].traffic_off() + if self._params['traffic']['bidir'] == 'True': + self.xmanager.ports[1].traffic_off() + sleep(5) - Where implemented, this function should raise an exception on - failure. + stat = self._create_api_result() + self.disconnect() + return stat - :returns: None - """ - pass + def connect(self): + self._logger.debug('Connect') + return self def disconnect(self): """Disconnect from the traffic generator. As with :func:`connect`, this function is optional. + Where implemented, this function should raise an exception on failure. :returns: None """ - pass + self._logger.debug('disconnect') + if self.xmanager: + self.xmanager.disconnect() + self.xmanager = None + + if self._xsocket: + self._xsocket.disconnect() + self._xsocket = None def send_burst_traffic(self, traffic=None, numpkts=100, duration=20): """Send a burst of traffic. @@ -239,40 +501,40 @@ class Xena(ITrafficGenerator): raise NotImplementedError('Xena burst traffic not implemented') def send_cont_traffic(self, traffic=None, duration=20): - """Send a continuous flow of traffic.r - - Send packets at ``framerate``, using ``traffic`` configuration, - until timeout ``time`` occurs. + """Send a continuous flow of traffic. - :param traffic: Detailed "traffic" spec, i.e. IP address, VLAN tags - :param duration: Time to wait to receive packets (secs) - :returns: dictionary of strings with following data: - - Tx Throughput (fps), - - Rx Throughput (fps), - - Tx Throughput (mbps), - - Rx Throughput (mbps), - - Tx Throughput (% linerate), - - Rx Throughput (% linerate), - - Min Latency (ns), - - Max Latency (ns), - - Avg Latency (ns) + See ITrafficGenerator for description """ - raise NotImplementedError('Xena continuous traffic not implemented') + self._duration = duration + + self._params.clear() + self._params['traffic'] = self.traffic_defaults.copy() + if traffic: + self._params['traffic'] = merge_spec(self._params['traffic'], + traffic) + + self._start_traffic_api(-1) + return self._stop_api_traffic() def start_cont_traffic(self, traffic=None, duration=20): """Non-blocking version of 'send_cont_traffic'. - Start transmission and immediately return. Do not wait for - results. - :param traffic: Detailed "traffic" spec, i.e. IP address, VLAN tags - :param duration: Time to wait to receive packets (secs) + See ITrafficGenerator for description """ - raise NotImplementedError('Xena continuous traffic not implemented') + self._duration = duration + + self._params.clear() + self._params['traffic'] = self.traffic_defaults.copy() + if traffic: + self._params['traffic'] = merge_spec(self._params['traffic'], + traffic) + + self._start_traffic_api(-1) def stop_cont_traffic(self): """Stop continuous transmission and return results. """ - raise NotImplementedError('Xena continuous traffic not implemented') + return self._stop_api_traffic() def send_rfc2544_throughput(self, traffic=None, trials=3, duration=20, lossrate=0.0): -- cgit 1.2.3-korg