diff options
Diffstat (limited to 'vstf/vstf/agent/spirent/tools.py')
-rw-r--r-- | vstf/vstf/agent/spirent/tools.py | 334 |
1 files changed, 0 insertions, 334 deletions
diff --git a/vstf/vstf/agent/spirent/tools.py b/vstf/vstf/agent/spirent/tools.py deleted file mode 100644 index 088a7b13..00000000 --- a/vstf/vstf/agent/spirent/tools.py +++ /dev/null @@ -1,334 +0,0 @@ -############################################################################## -# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## - - -import time -from spirent import stcPython - -class Spirent_Tools(object): - baseAPI = stcPython() - def __init__(self): - """This class provide API of Spirent - - """ - super(Spirent_Tools, self).__init__() - - def send_packet(self,flow): - try: - #import pdb - #pdb.set_trace() - flow = eval(flow) - #stc init action - self.baseAPI.stc_perform(' ResetConfig -config system1') - self.baseAPI.stc_init() - #create project - project = self.baseAPI.stc_create_project() - #create port - port_handle = self.baseAPI.stc_create_port(project) - #config port - slot = flow['send_port'].split('/')[0] - port = flow['send_port'].split('/')[1] - self.baseAPI.stc_config_port_location(port_handle,flow['tester_ip'],slot,port) - #create streamblock - streamblock_handle = self.baseAPI.stc_create_streamblock( - port_name = port_handle, - ExpectedRxPort = '', - vlan_tag = flow['vlan'], - srcMac = flow['src_mac'], - dstMac = flow['dst_mac'], - sourceAddr = flow['src_ip'], - destAddr =flow['dst_ip'] - ) - # attach port - port_list = [port_handle] - self.baseAPI.stc_attach_ports(port_list) - #start streamblock - streamblock_list = [streamblock_handle] - flag = self.baseAPI.stc_streamblock_start(streamblock_list) - return str(streamblock_list).strip('[]') - except : - print("[ERROR]create stream block and send packet failed.") - return False - - def mac_learning(self,flowA,flowB): - try: - #import pdb - #pdb.set_trace() - flowA = eval(flowA) - flowB = eval(flowB) - port_list = [] - streamblock_list = [] - #stc init action - self.baseAPI.stc_perform(' ResetConfig -config system1') - self.baseAPI.stc_init() - #create project - project = self.baseAPI.stc_create_project() - #create port and config port - for flow in [ flowA,flowB ]: - flow['port_handle'] = self.baseAPI.stc_create_port(project) - tmp_test_ip = flow['tester_ip'] - tmp_slot = flow['send_port'].split('/')[0] - tmp_port = flow['send_port'].split('/')[1] - self.baseAPI.stc_config_port_location(flow['port_handle'],tmp_test_ip,tmp_slot,tmp_port) - #create streamblock - flow['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = flow['port_handle'], - ExpectedRxPort = '', - vlan_tag = flow['vlan'], - srcMac = flow['src_mac'], - dstMac = flow['dst_mac'], - sourceAddr = flow['src_ip'], - destAddr =flow['dst_ip']) - #create port and stream block list - port_list.append(flow['port_handle']) - streamblock_list.append(flow['streamblock']) - - #attach port - self.baseAPI.stc_attach_ports(port_list) - #start streamblock - flag = self.baseAPI.stc_streamblock_start(streamblock_list) - # mac learning - time.sleep(2) - # stop stream block - self.baseAPI.stc_streamblock_stop(streamblock_list) - # delete streamblock and release port - for flow in [ flowA,flowB ]: - tmp_test_ip = flow['tester_ip'] - tmp_slot = flow['send_port'].split('/')[0] - tmp_port = flow['send_port'].split('/')[1] - self.baseAPI.stc_delete(flow['streamblock']) - self.baseAPI.stc_release('%s/%s/%s' %(tmp_test_ip,tmp_slot,tmp_port)) - # delete project - self.baseAPI.stc_delete('project1') - ret = self.baseAPI.stc_perform('ResetConfig -config system1') - return True - except : - print("[ERROR]mac learning failed") - return False - - def stop_flow(self,streamblock_list,flow): - flow = eval(flow) - streamblock_list = streamblock_list.strip('\'').split(',') - #stop streamblock list - try : - ret = self.baseAPI.stc_streamblock_stop(streamblock_list) - except : - print("[ERROR]Stop the streamblock list failed.") - return False - #delete streamblock - try : - for streamblock in streamblock_list : - ret = self.baseAPI.stc_delete(streamblock) - except : - print("[ERROR]delete stream block.") - return False - #release port - try : - slot = flow['send_port'].split('/')[0] - port = flow['send_port'].split('/')[1] - ret = self.baseAPI.stc_release('%s/%s/%s' %(flow['tester_ip'],slot,port)) - except : - print("[ERROR]Release port failed") - return False - ##delete project - try : - ret = self.baseAPI.stc_delete('project1') - ret = self.baseAPI.stc_perform('ResetConfig -config system1') - return True - except : - print("[ERROR]Delete project1 failed.") - return False - - def run_rfc2544_throughput(self,forward_init_flows,reverse_init_flows): - #import pdb - #pdb.set_trace() - #rebuild the flows - forward_init_flows = eval(forward_init_flows) - reverse_init_flows = eval(reverse_init_flows) - #stc init action - self.baseAPI.stc_perform(' ResetConfig -config system1') - self.baseAPI.stc_init() - #create project - project = self.baseAPI.stc_create_project() - #create sequencer - seq_handle = self.baseAPI.stc_create('Sequencer -under %s' %(project)) - #create port handle - forward_port_handle = self.baseAPI.stc_create_port(project) - reverse_port_handle = self.baseAPI.stc_create_port(project) - #create forward flow streamblock - for key in forward_init_flows.keys(): - forward_init_flows[key]['port_handle'] = forward_port_handle - tmp_test_ip = forward_init_flows[key]['tester_ip'] - tmp_slot = forward_init_flows[key]['send_port'].split('/')[0] - tmp_port = forward_init_flows[key]['send_port'].split('/')[1] - self.baseAPI.stc_config_port_location(forward_init_flows[key]['port_handle'],tmp_test_ip,tmp_slot,tmp_port) - #create streamblock - forward_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = forward_init_flows[key]['port_handle'], - vlan_tag = forward_init_flows[key]['vlan'], - ExpectedRxPort = reverse_port_handle, - srcMac = forward_init_flows[key]['src_mac'], - dstMac = forward_init_flows[key]['dst_mac'], - sourceAddr = forward_init_flows[key]['src_ip'], - destAddr = forward_init_flows[key]['dst_ip']) - #create reverse flow streamblock - for key in reverse_init_flows.keys(): - reverse_init_flows[key]['port_handle'] = reverse_port_handle - tmp_test_ip = reverse_init_flows[key]['tester_ip'] - tmp_slot = reverse_init_flows[key]['send_port'].split('/')[0] - tmp_port = reverse_init_flows[key]['send_port'].split('/')[1] - self.baseAPI.stc_config_port_location(reverse_init_flows[key]['port_handle'],tmp_test_ip,tmp_slot,tmp_port) - #create streamblock - reverse_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = reverse_init_flows[key]['port_handle'], - vlan_tag = reverse_init_flows[key]['vlan'], - ExpectedRxPort = forward_port_handle, - srcMac = reverse_init_flows[key]['src_mac'], - dstMac = reverse_init_flows[key]['dst_mac'], - sourceAddr = reverse_init_flows[key]['src_ip'], - destAddr = reverse_init_flows[key]['dst_ip']) - #Create the RFC 2544 throughput test - throughput_config = self.baseAPI.stc_create('Rfc2544ThroughputConfig -under ',project, - '-AcceptableFrameLoss 0.01', - '-NumOfTrials 1', - '-DurationSeconds 60', - '-SearchMode BINARY', - '-RateLowerLimit 1', - '-RateUpperLimit 100', - '-RateInitial 10', - '-UseExistingStreamBlocks True', - '-EnableLearning False', - '-FrameSizeIterationMode CUSTOM', - '-CustomFrameSizeList "70 128 256 512 1024 1280 1518"', - '-LatencyType LIFO', - '-EnableJitterMeasurement TRUE' - ) - #import pdb - #pdb.set_trace() - # list streamblocks - streamblock_list = '" ' - for key in forward_init_flows.keys(): - streamblock_list = streamblock_list+forward_init_flows[key]['streamblock']+' ' - for key in reverse_init_flows.keys(): - streamblock_list = streamblock_list+reverse_init_flows[key]['streamblock']+' ' - streamblock_list = streamblock_list+'"' - - throughput_sbProfile= self.baseAPI.stc_create('Rfc2544StreamBlockProfile -under '+throughput_config+' -Active TRUE -LocalActive TRUE') - self.baseAPI.stc_config(throughput_sbProfile,'-StreamBlockList '+streamblock_list) - self.baseAPI.stc_perform('ExpandBenchmarkConfigCommand','-config ',throughput_config) - - #attach the port before testing - port_list = [ forward_port_handle,reverse_port_handle] - self.baseAPI.stc_attach_ports(port_list) - - #stc apply and begin to sequence test - self.baseAPI.stc_apply() - self.baseAPI.stc_perform("SequencerStart") - - #wait until complete - self.baseAPI.stc_waituntilcomplete() - - #get result db - resultsdb = self.baseAPI.stc_get("system1.project.TestResultSetting", "-CurrentResultFileName") - results_dict = self.baseAPI.stc_perform('QueryResult','-DatabaseConnectionString',resultsdb,'-ResultPath RFC2544ThroughputTestResultDetailedSummaryView') - #print results_dict - return True,results_dict - - def run_rfc2544_frameloss(self,forward_init_flows,reverse_init_flows): - #import pdb - #pdb.set_trace() - #rebuild the flows - forward_init_flows = eval(forward_init_flows) - reverse_init_flows = eval(reverse_init_flows) - #stc init action - self.baseAPI.stc_perform(' ResetConfig -config system1') - self.baseAPI.stc_init() - #create project - project = self.baseAPI.stc_create_project() - #create sequencer - seq_handle = self.baseAPI.stc_create('Sequencer -under %s' %(project)) - #create port handle - forward_port_handle = self.baseAPI.stc_create_port(project) - reverse_port_handle = self.baseAPI.stc_create_port(project) - #create forward flow streamblock - for key in forward_init_flows.keys(): - forward_init_flows[key]['port_handle'] = forward_port_handle - tmp_test_ip = forward_init_flows[key]['tester_ip'] - tmp_slot = forward_init_flows[key]['send_port'].split('/')[0] - tmp_port = forward_init_flows[key]['send_port'].split('/')[1] - self.baseAPI.stc_config_port_location(forward_init_flows[key]['port_handle'],tmp_test_ip,tmp_slot,tmp_port) - #create streamblock - forward_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = forward_init_flows[key]['port_handle'], - vlan_tag = forward_init_flows[key]['vlan'], - ExpectedRxPort = reverse_port_handle, - srcMac = forward_init_flows[key]['src_mac'], - dstMac = forward_init_flows[key]['dst_mac'], - sourceAddr = forward_init_flows[key]['src_ip'], - destAddr = forward_init_flows[key]['dst_ip']) - #create reverse flow streamblock - for key in reverse_init_flows.keys(): - reverse_init_flows[key]['port_handle'] = reverse_port_handle - tmp_test_ip = reverse_init_flows[key]['tester_ip'] - tmp_slot = reverse_init_flows[key]['send_port'].split('/')[0] - tmp_port = reverse_init_flows[key]['send_port'].split('/')[1] - self.baseAPI.stc_config_port_location(reverse_init_flows[key]['port_handle'],tmp_test_ip,tmp_slot,tmp_port) - #create streamblock - reverse_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = reverse_init_flows[key]['port_handle'], - vlan_tag = reverse_init_flows[key]['vlan'], - ExpectedRxPort = forward_port_handle, - srcMac = reverse_init_flows[key]['src_mac'], - dstMac = reverse_init_flows[key]['dst_mac'], - sourceAddr = reverse_init_flows[key]['src_ip'], - destAddr = reverse_init_flows[key]['dst_ip']) - #Create the RFC 2544 frameloss test - frameloss_config = self.baseAPI.stc_create('Rfc2544FrameLossConfig -under ',project, - '-NumOfTrials 1 ', - '-DurationSeconds 60 ', - '-LoadUnits PERCENT_LINE_RATE ', - '-LoadType CUSTOM ' - '-CustomLoadList 100 ' - '-UseExistingStreamBlocks True ', - '-EnableLearning False ', - '-FrameSizeIterationMode CUSTOM ', - '-CustomFrameSizeList "70 128 256 512 1024 1280 1518"', - '-LatencyType LIFO', - '-EnableJitterMeasurement TRUE' - ) - #import pdb - #pdb.set_trace() - # list streamblocks - streamblock_list = '" ' - for key in forward_init_flows.keys(): - streamblock_list = streamblock_list+forward_init_flows[key]['streamblock']+' ' - for key in reverse_init_flows.keys(): - streamblock_list = streamblock_list+reverse_init_flows[key]['streamblock']+' ' - streamblock_list = streamblock_list+'"' - - frameloss_sbProfile= self.baseAPI.stc_create('Rfc2544StreamBlockProfile -under '+frameloss_config+' -Active TRUE -LocalActive TRUE') - self.baseAPI.stc_config(frameloss_sbProfile,'-StreamBlockList '+streamblock_list) - self.baseAPI.stc_perform('ExpandBenchmarkConfigCommand','-config ',frameloss_config) - - #attach the port before testing - port_list = [ forward_port_handle,reverse_port_handle] - self.baseAPI.stc_attach_ports(port_list) - - #stc apply and begin to sequence test - self.baseAPI.stc_apply() - self.baseAPI.stc_perform("SequencerStart") - - #wait until complete - self.baseAPI.stc_waituntilcomplete() - - #get result db - resultsdb = self.baseAPI.stc_get("system1.project.TestResultSetting", "-CurrentResultFileName") - results_dict = self.baseAPI.stc_perform('QueryResult','-DatabaseConnectionString',resultsdb,'-ResultPath RFC2544FrameLossTestResultDetailedSummaryView') - #import pdb - #pdb.set_trace() - return True,results_dict - - def run_rfc2544_latency(self,forward_init_flows,reverse_init_flows): - pass - |