summaryrefslogtreecommitdiffstats
path: root/vstf/vstf/agent/spirent/tools.py
diff options
context:
space:
mode:
Diffstat (limited to 'vstf/vstf/agent/spirent/tools.py')
-rw-r--r--vstf/vstf/agent/spirent/tools.py334
1 files changed, 0 insertions, 334 deletions
diff --git a/vstf/vstf/agent/spirent/tools.py b/vstf/vstf/agent/spirent/tools.py
deleted file mode 100644
index 088a7b13..00000000
--- a/vstf/vstf/agent/spirent/tools.py
+++ /dev/null
@@ -1,334 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-
-import time
-from spirent import stcPython
-
-class Spirent_Tools(object):
- baseAPI = stcPython()
- def __init__(self):
- """This class provide API of Spirent
-
- """
- super(Spirent_Tools, self).__init__()
-
- def send_packet(self,flow):
- try:
- #import pdb
- #pdb.set_trace()
- flow = eval(flow)
- #stc init action
- self.baseAPI.stc_perform(' ResetConfig -config system1')
- self.baseAPI.stc_init()
- #create project
- project = self.baseAPI.stc_create_project()
- #create port
- port_handle = self.baseAPI.stc_create_port(project)
- #config port
- slot = flow['send_port'].split('/')[0]
- port = flow['send_port'].split('/')[1]
- self.baseAPI.stc_config_port_location(port_handle,flow['tester_ip'],slot,port)
- #create streamblock
- streamblock_handle = self.baseAPI.stc_create_streamblock(
- port_name = port_handle,
- ExpectedRxPort = '',
- vlan_tag = flow['vlan'],
- srcMac = flow['src_mac'],
- dstMac = flow['dst_mac'],
- sourceAddr = flow['src_ip'],
- destAddr =flow['dst_ip']
- )
- # attach port
- port_list = [port_handle]
- self.baseAPI.stc_attach_ports(port_list)
- #start streamblock
- streamblock_list = [streamblock_handle]
- flag = self.baseAPI.stc_streamblock_start(streamblock_list)
- return str(streamblock_list).strip('[]')
- except :
- print("[ERROR]create stream block and send packet failed.")
- return False
-
- def mac_learning(self,flowA,flowB):
- try:
- #import pdb
- #pdb.set_trace()
- flowA = eval(flowA)
- flowB = eval(flowB)
- port_list = []
- streamblock_list = []
- #stc init action
- self.baseAPI.stc_perform(' ResetConfig -config system1')
- self.baseAPI.stc_init()
- #create project
- project = self.baseAPI.stc_create_project()
- #create port and config port
- for flow in [ flowA,flowB ]:
- flow['port_handle'] = self.baseAPI.stc_create_port(project)
- tmp_test_ip = flow['tester_ip']
- tmp_slot = flow['send_port'].split('/')[0]
- tmp_port = flow['send_port'].split('/')[1]
- self.baseAPI.stc_config_port_location(flow['port_handle'],tmp_test_ip,tmp_slot,tmp_port)
- #create streamblock
- flow['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = flow['port_handle'],
- ExpectedRxPort = '',
- vlan_tag = flow['vlan'],
- srcMac = flow['src_mac'],
- dstMac = flow['dst_mac'],
- sourceAddr = flow['src_ip'],
- destAddr =flow['dst_ip'])
- #create port and stream block list
- port_list.append(flow['port_handle'])
- streamblock_list.append(flow['streamblock'])
-
- #attach port
- self.baseAPI.stc_attach_ports(port_list)
- #start streamblock
- flag = self.baseAPI.stc_streamblock_start(streamblock_list)
- # mac learning
- time.sleep(2)
- # stop stream block
- self.baseAPI.stc_streamblock_stop(streamblock_list)
- # delete streamblock and release port
- for flow in [ flowA,flowB ]:
- tmp_test_ip = flow['tester_ip']
- tmp_slot = flow['send_port'].split('/')[0]
- tmp_port = flow['send_port'].split('/')[1]
- self.baseAPI.stc_delete(flow['streamblock'])
- self.baseAPI.stc_release('%s/%s/%s' %(tmp_test_ip,tmp_slot,tmp_port))
- # delete project
- self.baseAPI.stc_delete('project1')
- ret = self.baseAPI.stc_perform('ResetConfig -config system1')
- return True
- except :
- print("[ERROR]mac learning failed")
- return False
-
- def stop_flow(self,streamblock_list,flow):
- flow = eval(flow)
- streamblock_list = streamblock_list.strip('\'').split(',')
- #stop streamblock list
- try :
- ret = self.baseAPI.stc_streamblock_stop(streamblock_list)
- except :
- print("[ERROR]Stop the streamblock list failed.")
- return False
- #delete streamblock
- try :
- for streamblock in streamblock_list :
- ret = self.baseAPI.stc_delete(streamblock)
- except :
- print("[ERROR]delete stream block.")
- return False
- #release port
- try :
- slot = flow['send_port'].split('/')[0]
- port = flow['send_port'].split('/')[1]
- ret = self.baseAPI.stc_release('%s/%s/%s' %(flow['tester_ip'],slot,port))
- except :
- print("[ERROR]Release port failed")
- return False
- ##delete project
- try :
- ret = self.baseAPI.stc_delete('project1')
- ret = self.baseAPI.stc_perform('ResetConfig -config system1')
- return True
- except :
- print("[ERROR]Delete project1 failed.")
- return False
-
- def run_rfc2544_throughput(self,forward_init_flows,reverse_init_flows):
- #import pdb
- #pdb.set_trace()
- #rebuild the flows
- forward_init_flows = eval(forward_init_flows)
- reverse_init_flows = eval(reverse_init_flows)
- #stc init action
- self.baseAPI.stc_perform(' ResetConfig -config system1')
- self.baseAPI.stc_init()
- #create project
- project = self.baseAPI.stc_create_project()
- #create sequencer
- seq_handle = self.baseAPI.stc_create('Sequencer -under %s' %(project))
- #create port handle
- forward_port_handle = self.baseAPI.stc_create_port(project)
- reverse_port_handle = self.baseAPI.stc_create_port(project)
- #create forward flow streamblock
- for key in forward_init_flows.keys():
- forward_init_flows[key]['port_handle'] = forward_port_handle
- tmp_test_ip = forward_init_flows[key]['tester_ip']
- tmp_slot = forward_init_flows[key]['send_port'].split('/')[0]
- tmp_port = forward_init_flows[key]['send_port'].split('/')[1]
- self.baseAPI.stc_config_port_location(forward_init_flows[key]['port_handle'],tmp_test_ip,tmp_slot,tmp_port)
- #create streamblock
- forward_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = forward_init_flows[key]['port_handle'],
- vlan_tag = forward_init_flows[key]['vlan'],
- ExpectedRxPort = reverse_port_handle,
- srcMac = forward_init_flows[key]['src_mac'],
- dstMac = forward_init_flows[key]['dst_mac'],
- sourceAddr = forward_init_flows[key]['src_ip'],
- destAddr = forward_init_flows[key]['dst_ip'])
- #create reverse flow streamblock
- for key in reverse_init_flows.keys():
- reverse_init_flows[key]['port_handle'] = reverse_port_handle
- tmp_test_ip = reverse_init_flows[key]['tester_ip']
- tmp_slot = reverse_init_flows[key]['send_port'].split('/')[0]
- tmp_port = reverse_init_flows[key]['send_port'].split('/')[1]
- self.baseAPI.stc_config_port_location(reverse_init_flows[key]['port_handle'],tmp_test_ip,tmp_slot,tmp_port)
- #create streamblock
- reverse_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = reverse_init_flows[key]['port_handle'],
- vlan_tag = reverse_init_flows[key]['vlan'],
- ExpectedRxPort = forward_port_handle,
- srcMac = reverse_init_flows[key]['src_mac'],
- dstMac = reverse_init_flows[key]['dst_mac'],
- sourceAddr = reverse_init_flows[key]['src_ip'],
- destAddr = reverse_init_flows[key]['dst_ip'])
- #Create the RFC 2544 throughput test
- throughput_config = self.baseAPI.stc_create('Rfc2544ThroughputConfig -under ',project,
- '-AcceptableFrameLoss 0.01',
- '-NumOfTrials 1',
- '-DurationSeconds 60',
- '-SearchMode BINARY',
- '-RateLowerLimit 1',
- '-RateUpperLimit 100',
- '-RateInitial 10',
- '-UseExistingStreamBlocks True',
- '-EnableLearning False',
- '-FrameSizeIterationMode CUSTOM',
- '-CustomFrameSizeList "70 128 256 512 1024 1280 1518"',
- '-LatencyType LIFO',
- '-EnableJitterMeasurement TRUE'
- )
- #import pdb
- #pdb.set_trace()
- # list streamblocks
- streamblock_list = '" '
- for key in forward_init_flows.keys():
- streamblock_list = streamblock_list+forward_init_flows[key]['streamblock']+' '
- for key in reverse_init_flows.keys():
- streamblock_list = streamblock_list+reverse_init_flows[key]['streamblock']+' '
- streamblock_list = streamblock_list+'"'
-
- throughput_sbProfile= self.baseAPI.stc_create('Rfc2544StreamBlockProfile -under '+throughput_config+' -Active TRUE -LocalActive TRUE')
- self.baseAPI.stc_config(throughput_sbProfile,'-StreamBlockList '+streamblock_list)
- self.baseAPI.stc_perform('ExpandBenchmarkConfigCommand','-config ',throughput_config)
-
- #attach the port before testing
- port_list = [ forward_port_handle,reverse_port_handle]
- self.baseAPI.stc_attach_ports(port_list)
-
- #stc apply and begin to sequence test
- self.baseAPI.stc_apply()
- self.baseAPI.stc_perform("SequencerStart")
-
- #wait until complete
- self.baseAPI.stc_waituntilcomplete()
-
- #get result db
- resultsdb = self.baseAPI.stc_get("system1.project.TestResultSetting", "-CurrentResultFileName")
- results_dict = self.baseAPI.stc_perform('QueryResult','-DatabaseConnectionString',resultsdb,'-ResultPath RFC2544ThroughputTestResultDetailedSummaryView')
- #print results_dict
- return True,results_dict
-
- def run_rfc2544_frameloss(self,forward_init_flows,reverse_init_flows):
- #import pdb
- #pdb.set_trace()
- #rebuild the flows
- forward_init_flows = eval(forward_init_flows)
- reverse_init_flows = eval(reverse_init_flows)
- #stc init action
- self.baseAPI.stc_perform(' ResetConfig -config system1')
- self.baseAPI.stc_init()
- #create project
- project = self.baseAPI.stc_create_project()
- #create sequencer
- seq_handle = self.baseAPI.stc_create('Sequencer -under %s' %(project))
- #create port handle
- forward_port_handle = self.baseAPI.stc_create_port(project)
- reverse_port_handle = self.baseAPI.stc_create_port(project)
- #create forward flow streamblock
- for key in forward_init_flows.keys():
- forward_init_flows[key]['port_handle'] = forward_port_handle
- tmp_test_ip = forward_init_flows[key]['tester_ip']
- tmp_slot = forward_init_flows[key]['send_port'].split('/')[0]
- tmp_port = forward_init_flows[key]['send_port'].split('/')[1]
- self.baseAPI.stc_config_port_location(forward_init_flows[key]['port_handle'],tmp_test_ip,tmp_slot,tmp_port)
- #create streamblock
- forward_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = forward_init_flows[key]['port_handle'],
- vlan_tag = forward_init_flows[key]['vlan'],
- ExpectedRxPort = reverse_port_handle,
- srcMac = forward_init_flows[key]['src_mac'],
- dstMac = forward_init_flows[key]['dst_mac'],
- sourceAddr = forward_init_flows[key]['src_ip'],
- destAddr = forward_init_flows[key]['dst_ip'])
- #create reverse flow streamblock
- for key in reverse_init_flows.keys():
- reverse_init_flows[key]['port_handle'] = reverse_port_handle
- tmp_test_ip = reverse_init_flows[key]['tester_ip']
- tmp_slot = reverse_init_flows[key]['send_port'].split('/')[0]
- tmp_port = reverse_init_flows[key]['send_port'].split('/')[1]
- self.baseAPI.stc_config_port_location(reverse_init_flows[key]['port_handle'],tmp_test_ip,tmp_slot,tmp_port)
- #create streamblock
- reverse_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = reverse_init_flows[key]['port_handle'],
- vlan_tag = reverse_init_flows[key]['vlan'],
- ExpectedRxPort = forward_port_handle,
- srcMac = reverse_init_flows[key]['src_mac'],
- dstMac = reverse_init_flows[key]['dst_mac'],
- sourceAddr = reverse_init_flows[key]['src_ip'],
- destAddr = reverse_init_flows[key]['dst_ip'])
- #Create the RFC 2544 frameloss test
- frameloss_config = self.baseAPI.stc_create('Rfc2544FrameLossConfig -under ',project,
- '-NumOfTrials 1 ',
- '-DurationSeconds 60 ',
- '-LoadUnits PERCENT_LINE_RATE ',
- '-LoadType CUSTOM '
- '-CustomLoadList 100 '
- '-UseExistingStreamBlocks True ',
- '-EnableLearning False ',
- '-FrameSizeIterationMode CUSTOM ',
- '-CustomFrameSizeList "70 128 256 512 1024 1280 1518"',
- '-LatencyType LIFO',
- '-EnableJitterMeasurement TRUE'
- )
- #import pdb
- #pdb.set_trace()
- # list streamblocks
- streamblock_list = '" '
- for key in forward_init_flows.keys():
- streamblock_list = streamblock_list+forward_init_flows[key]['streamblock']+' '
- for key in reverse_init_flows.keys():
- streamblock_list = streamblock_list+reverse_init_flows[key]['streamblock']+' '
- streamblock_list = streamblock_list+'"'
-
- frameloss_sbProfile= self.baseAPI.stc_create('Rfc2544StreamBlockProfile -under '+frameloss_config+' -Active TRUE -LocalActive TRUE')
- self.baseAPI.stc_config(frameloss_sbProfile,'-StreamBlockList '+streamblock_list)
- self.baseAPI.stc_perform('ExpandBenchmarkConfigCommand','-config ',frameloss_config)
-
- #attach the port before testing
- port_list = [ forward_port_handle,reverse_port_handle]
- self.baseAPI.stc_attach_ports(port_list)
-
- #stc apply and begin to sequence test
- self.baseAPI.stc_apply()
- self.baseAPI.stc_perform("SequencerStart")
-
- #wait until complete
- self.baseAPI.stc_waituntilcomplete()
-
- #get result db
- resultsdb = self.baseAPI.stc_get("system1.project.TestResultSetting", "-CurrentResultFileName")
- results_dict = self.baseAPI.stc_perform('QueryResult','-DatabaseConnectionString',resultsdb,'-ResultPath RFC2544FrameLossTestResultDetailedSummaryView')
- #import pdb
- #pdb.set_trace()
- return True,results_dict
-
- def run_rfc2544_latency(self,forward_init_flows,reverse_init_flows):
- pass
-