From 8f1101df131a4d3e03b377738507d88b745831c0 Mon Sep 17 00:00:00 2001 From: "Yiting.Li" Date: Tue, 22 Dec 2015 17:11:12 -0800 Subject: Upload the contribution of vstf as bottleneck network framework. End to End Performance test JIRA:BOTTLENECK-29 Change-Id: Ib2c553c8b60d6cda9e7a7b52b737c9139f706ebd Signed-off-by: Yiting.Li --- vstf/vstf/controller/spirent/common/__init__.py | 14 + vstf/vstf/controller/spirent/common/model.py | 462 +++++++++++++++++++++ .../controller/spirent/common/result_analysis.py | 172 ++++++++ 3 files changed, 648 insertions(+) create mode 100755 vstf/vstf/controller/spirent/common/__init__.py create mode 100755 vstf/vstf/controller/spirent/common/model.py create mode 100755 vstf/vstf/controller/spirent/common/result_analysis.py (limited to 'vstf/vstf/controller/spirent/common') diff --git a/vstf/vstf/controller/spirent/common/__init__.py b/vstf/vstf/controller/spirent/common/__init__.py new file mode 100755 index 00000000..0e98d82e --- /dev/null +++ b/vstf/vstf/controller/spirent/common/__init__.py @@ -0,0 +1,14 @@ +# Copyright Huawei Technologies Co., Ltd. 1998-2015. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the License); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. \ No newline at end of file diff --git a/vstf/vstf/controller/spirent/common/model.py b/vstf/vstf/controller/spirent/common/model.py new file mode 100755 index 00000000..511eab40 --- /dev/null +++ b/vstf/vstf/controller/spirent/common/model.py @@ -0,0 +1,462 @@ +#!/usr/bin/python +""" + @author: l00190809 + @group: Huawei Ltd +""" +import re +import copy +import time +import ConfigParser + +fwd = {'single': ['forward'], + 'double': ['forward', 'reverse'] + } +models = ['Tnv'] +direction = ['single', 'double'] +reverse_dict = { + 'forward': 'reverse', + 'reverse': 'forward' +} + + +class BaseModel(object): + def __init__(self, config): + self.config = config + + def _check_model(self): + return self.config['model'] in models + + def _check_virtenv(self): + try: + num = int(self.config['virtenv']) + return num in range(1, 9) + except: + print("[ERROR]The virtenv is not a inter number.") + + def _check_queues(self): + try: + num = int(self.config['queues']) + return num in range(1, 9) + except: + print("[ERROR]The virt queues is not a inter number.") + + @property + def _check_flows(self): + try: + num = int(self.config['flows']) + return num in range(1, 9) + except: + print("[ERROR]The flow is not a inter number.") + + def _check_direct(self): + return self.config['direct'] in direction + + def _check_vlans(self): + return self.config['vlans'] in ['True', 'False'] + + def _check_bind(self): + return True + + def check_parameter_invalid(self): + try: + if self._check_model() and \ + self._check_virtenv() and \ + self._check_queues() and \ + self._check_flows and \ + self._check_direct() and \ + self._check_vlans() and \ + self._check_bind(): + return True + else: + print("[ERROR]Paramter check invalid") + return False + except: + print("[ERROR]Check parameter invalid with unknown reason.") + return False + + +def _get_array_values(irq_array): + proc_list = [] + for i in range(len(irq_array)): + proc_list.append(irq_array[i][1]) + return sorted(proc_list) + + +def check_dict(thread_info, flow): + if thread_info['src_recv_irq'] != flow['src_recv_irq']: + print("[WARN]Flow src_irq process %s not match %s in the table." + % (thread_info['src_recv_irq'], + flow['src_recv_irq'])) + return False + if thread_info['dst_send_irq'] != flow['dst_send_irq']: + print("[WARN]Flow dst_irq process %s not match %s in the table." + % (thread_info['dst_send_irq'], + flow['dst_send_irq'])) + return False + return True + + +def dst_ip_update(flow): + try: + src_dst_ip = flow['dst_ip'] + ip_section = '.'.join(src_dst_ip.split('.')[0:3]) + '.' + number = int(src_dst_ip.split('.')[3]) + new_number = number + 1 + new_dst_ip = ip_section + str(new_number) + flow['dst_ip'] = new_dst_ip + except: + print("[ERROR]dst ip update failed.") + + +def _tranfer_array_to_range(array): + return str(array[0]) + '-' + str(array[-1]) + + +class TnV(BaseModel): + def __init__(self, config): + super(TnV, self).__init__(config) + self.config = config + self.host_instance = None + self.send_instace = None + self.vms = None + self.init_flows = {} + handle = ConfigParser.ConfigParser() + handle.read(self.config['configfile']) + self.handle = handle + + def _get_vms(self): + return self.host_instance.get_libvirt_vms() + + def flow_match(self): + _queues = int(self.config['queues']) + _virtenv = int(self.config['virtenv']) + _flows = int(self.config['flows']) + return _flows == _queues * _virtenv + + def match_virt_env(self): + try: + self.vms = self._get_vms() + return len(self.vms) == int(self.config['virtenv']) + except: + print("[ERROR]vms or containers number is equal to virtenv.") + return False + + @property + def match_flows_and_nic(self): + # get src_nic + for section in ['send', 'recv']: + nic = self._get_nic_from_file(section, 'nic') + try: + irq_proc = self.host_instance.get_nic_interrupt_proc(nic) + return int(self.config['flows']) == len(irq_proc) + except: + print("[ERROR]match flow with nic interrupt failed.") + return False + + def _get_nic_irq_proc(self, nic): + return self.host_instance.get_nic_interrupt_proc(nic) + + def _get_nic_from_file(self, section, column): + return self.handle.get(section, column) + + def _get_range(self, section, column): + try: + info = self.handle.get(section, column) + return info.split(' ') + except: + print("[ERROR]Get mac failed.") + return False + + def check_mac_valid(self): + flag = True + try: + for option in ['send', 'recv']: + info = self.handle.get(option, 'macs') + macs = info.split() + if len(macs) != int(self.config['virtenv']) or macs == []: + print("[ERROR]The macs number is not equal to vms or containers.") + return False + for mac in macs: + # check mac valid + if re.match(r'..:..:..:..:..:..', mac): + continue + else: + print("[ERROR]mac %s invalid" % mac) + flag = False + break + if not flag: + break + return flag + except: + print("[ERROR]parse macs failed.") + return False + + def check_vlan_valid(self): + flag = True + for direct in ['send', 'recv']: + vlans = self.handle.get(direct, 'vlans').split() + if len(vlans) != int(self.config['virtenv']): + print("[ERROR]vlan un config") + return False + for vlan in vlans: + if int(vlan) <= 1 or int(vlan) >= 4095: + flag = False + break + return flag + + @property + def check_logic_invalid(self): + return self.flow_match() and self.match_virt_env() and \ + self.match_flows_and_nic and self.check_mac_valid() and \ + self.check_vlan_valid() + + @property + def read_flow_init(self): + # The + temp_flow = {} + src_macs = self._get_range('send', 'macs') + dst_macs = self._get_range('recv', 'macs') + src_vlan = self._get_range('send', 'vlans') + dst_vlan = self._get_range('recv', 'vlans') + src_nic = self._get_nic_from_file('send', 'nic') + dst_nic = self._get_nic_from_file('recv', 'nic') + src_nic_irq = _get_array_values(self._get_nic_irq_proc(src_nic)) + dst_nic_irq = _get_array_values(self._get_nic_irq_proc(dst_nic)) + src_ip_sections = self._get_range('send', 'ip_sections') + dst_ip_sections = self._get_range('recv', 'ip_sections') + send_port = self._get_nic_from_file('send', 'port') + recv_port = self._get_nic_from_file('recv', 'port') + temp_flow['tester_ip'] = self._get_nic_from_file('common', 'tester_ip') + vlan = src_vlan + avg_flow = int(self.config['flows']) / int(self.config['virtenv']) + # build the main dictionary + for _direct in sorted(fwd[self.config['direct']]): + i = 0 + j = 0 + temp_flow['direct'] = _direct + temp_flow['send_port'] = send_port + temp_flow['recv_port'] = recv_port + + for _vm in sorted(self.vms): + vlan_id = { + 'True': vlan[i], + 'False': None} + temp_flow['virt'] = _vm + _vm_info = self.host_instance.get_vm_info(_vm) + temp_flow['qemu_proc'] = _vm_info['main_pid'] + # temp_flow['qemu_thread'] = _vm_info['qemu_thread'] + temp_flow['mem_numa'] = _vm_info['mem_numa'] + # temp_flow['vhost_thread'] = _vm_info['vhost_thread'] + + temp_flow['src_mac'] = src_macs[i] + temp_flow['dst_mac'] = dst_macs[i] + temp_flow['vlan'] = vlan_id[self.config['vlans']] + src_ip = src_ip_sections[i] + dst_ip = dst_ip_sections[i] + temp_flow['src_ip'] = src_ip + temp_flow['dst_ip'] = dst_ip + vm_index = sorted(self.vms).index(_vm) + for _queue in range(1, int(self.config['queues']) + 1): + # flow info + temp_flow['queue'] = _queue + # fwd thread + + temp_flow['qemu_thread_list'] = _vm_info['qemu_thread'] + forward_core = { + "forward": _vm_info['qemu_thread'][_queue + avg_flow * vm_index], + "reverse": _vm_info['qemu_thread'][_queue + avg_flow * vm_index + int(self.config['flows'])] + } + temp_flow['fwd_thread'] = forward_core[_direct] + + temp_flow['fwd_vhost'] = None + # nic interrupts info + temp_flow['src_recv_irq'] = src_nic_irq[j] + temp_flow['src_nic'] = src_nic + temp_flow['dst_send_irq'] = dst_nic_irq[j] + temp_flow['dst_nic'] = dst_nic + # above all + j += 1 + self.init_flows[_direct + '_' + _vm + '_' + str(_queue)] = copy.deepcopy(temp_flow) + i += 1 + src_nic_irq, dst_nic_irq = dst_nic_irq, src_nic_irq + vlan = dst_vlan + send_port, recv_port = recv_port, send_port + src_nic, dst_nic = dst_nic, src_nic + src_macs, dst_macs = dst_macs, src_macs + src_ip_sections, dst_ip_sections = dst_ip_sections, src_ip_sections + # return sorted(self.init_flows.iteritems(), key=lambda d:d[0]) + return self.init_flows + + def mac_learning(self, flowa, flowb): + flowa = str(flowa) + flowb = str(flowb) + ret = self.send_instace.mac_learning(flowa, flowb) + return ret + + def send_packet(self, flow): + flow = str(flow) + # return a stream block handle + return self.send_instace.send_packet(flow) + + def stop_flow(self, streamblock, flow): + flow = str(flow) + return self.send_instace.stop_flow(streamblock, flow) + + def catch_thread_info(self): + return self.host_instance.catch_thread_info() + + def set_thread2flow(self, thread_info, flow): + flow['fwd_vhost'] = thread_info['fwd_vhost'] + return True + + @property + def flow_build(self): + for _direct in fwd[self.config['direct']]: + for _vm in self.vms: + for _queue in range(1, int(self.config['queues']) + 1): + i = 0 + while i < 50: + try: + i += 1 + thread_info = None + self.mac_learning(self.init_flows[_direct + '_' + _vm + '_' + str(_queue)], + self.init_flows[reverse_dict[_direct] + '_' + _vm + '_' + str(_queue)]) + streamblock = self.send_packet(self.init_flows[_direct + '_' + _vm + '_' + str(_queue)]) + time.sleep(1) + result, thread_info = self.catch_thread_info() + thread_info = eval(thread_info) + self.stop_flow(streamblock, self.init_flows[_direct + '_' + _vm + '_' + str(_queue)]) + time.sleep(1) + if not result: + print("[ERROR]Catch the thread info failed.") + break + except: + print("[ERROR]send flow failed error or get host thread info failed.") + + # compare the got thread info to + if check_dict(thread_info, self.init_flows[_direct + '_' + _vm + '_' + str(_queue)]): + self.set_thread2flow(thread_info, self.init_flows[_direct + '_' + _vm + '_' + str(_queue)]) + print("[INFO]Flow %s_%s_%s : fwd_vhost %s src_recv_irq %s dst_send_irq %s" + % (_direct, _vm, _queue, thread_info['fwd_vhost'], thread_info['src_recv_irq'], + thread_info['dst_send_irq'])) + print("%s" % (self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])) + break + else: + dst_ip_update(self.init_flows[_direct + '_' + _vm + '_' + str(_queue)]) + return self.init_flows + + def affinity_bind(self, aff_strategy): + # get the forward cores + qemu_list = [] + qemu_other = [] + src_vhost = [] + dst_vhost = [] + src_irq = [] + dst_irq = [] + + # recognize the thread id + for flowname in sorted(self.init_flows.keys()): + tmp_thread = self.init_flows[flowname]['fwd_thread'] + qemu_other = qemu_other + copy.deepcopy(self.init_flows[flowname]['qemu_thread_list']) + qemu_list.append(tmp_thread) + if self.init_flows[flowname]['direct'] == 'forward': + dst_vhost.append(self.init_flows[flowname]['fwd_vhost']) + src_irq.append(self.init_flows[flowname]['src_recv_irq']) + dst_irq.append(self.init_flows[flowname]['dst_send_irq']) + elif self.init_flows[flowname]['direct'] == 'reverse': + src_vhost.append(self.init_flows[flowname]['fwd_vhost']) + dst_irq.append(self.init_flows[flowname]['src_recv_irq']) + src_irq.append(self.init_flows[flowname]['dst_send_irq']) + + qemu_list = sorted({}.fromkeys(qemu_list).keys()) + src_vhost = sorted({}.fromkeys(src_vhost).keys()) + dst_vhost = sorted({}.fromkeys(dst_vhost).keys()) + src_irq = sorted({}.fromkeys(src_irq).keys()) + dst_irq = sorted({}.fromkeys(dst_irq).keys()) + + # get the qemu thread except the forward core + qemu_other = sorted({}.fromkeys(qemu_other).keys()) + for i in qemu_list: + qemu_other.remove(i) + # get the bind strategy + handle = ConfigParser.ConfigParser() + handle.read(self.config['strategyfile']) + try: + qemu_numa = handle.get('strategy' + self.config['strategy'], 'qemu_numa') + src_vhost_numa = handle.get('strategy' + self.config['strategy'], 'src_vhost_numa') + dst_vhost_numa = handle.get('strategy' + self.config['strategy'], 'dst_vhost_numa') + src_irq_numa = handle.get('strategy' + self.config['strategy'], 'src_irq_numa') + dst_irq_numa = handle.get('strategy' + self.config['strategy'], 'dst_irq_numa') + loan_numa = handle.get('strategy' + self.config['strategy'], 'loan_numa') + except: + print("[ERROR]Parse the strategy file failed or get the options failed.") + + for value in [qemu_numa, src_vhost_numa, dst_vhost_numa, src_irq_numa, dst_irq_numa, loan_numa]: + if value is not None or value == '': + raise ValueError('some option in the strategy file is none.') + # cores mapping thread + numa_topo = self.host_instance.get_numa_core() + numa_topo = eval(numa_topo) + # first check the cores number + + # order src_irq dst_irq src_vhost dst_vhost qemu_list + for node in numa_topo.keys(): + numa_topo[node]['process'] = [] + if 'node' + src_irq_numa == node: + numa_topo[node]['process'] = numa_topo[node]['process'] + src_irq + if 'node' + dst_irq_numa == node: + numa_topo[node]['process'] = numa_topo[node]['process'] + dst_irq + if 'node' + src_vhost_numa == node: + numa_topo[node]['process'] = numa_topo[node]['process'] + src_vhost + if 'node' + dst_vhost_numa == node: + numa_topo[node]['process'] = numa_topo[node]['process'] + dst_vhost + if 'node' + qemu_numa == node: + numa_topo[node]['process'] = numa_topo[node]['process'] + qemu_list + loan_cores = '' + for node in numa_topo.keys(): + if len(numa_topo[node]['process']) > len(numa_topo[node]['phy_cores']): + # length distance + diff = len(numa_topo[node]['process']) - len(numa_topo[node]['phy_cores']) + # first deep copy + numa_topo['node' + loan_numa]['process'] = numa_topo['node' + loan_numa]['process'] + copy.deepcopy( + numa_topo[node]['process'][-diff:]) + cores_str = _tranfer_array_to_range(numa_topo['node' + loan_numa]['phy_cores'][diff:]) + loan_cores = ','.join([loan_cores, cores_str]) + numa_topo[node]['process'] = numa_topo[node]['process'][0:-diff] + loan_cores = loan_cores[1:] + loan_bind_list = {} + for proc_loan in qemu_other: + loan_bind_list[proc_loan] = loan_cores + + bind_list = {} + for node in numa_topo.keys(): + for i in range(len(numa_topo[node]['process'])): + bind_list[numa_topo[node]['process'][i]] = str(numa_topo[node]['phy_cores'][i]) + bind_list.update(loan_bind_list) + for key in bind_list.keys(): + self.host_instance.bind_cpu(bind_list[key], key) + print bind_list + return True + + def testrun(self, suite): + global forward_init_flows, reverse_init_flows + try: + forward_init_flows = {} + reverse_init_flows = {} + for key in self.init_flows.keys(): + if self.init_flows[key]['direct'] == "forward": + forward_init_flows[key] = self.init_flows[key] + elif self.init_flows[key]['direct'] == "reverse": + reverse_init_flows[key] = self.init_flows[key] + forward_init_flows = str(forward_init_flows) + reverse_init_flows = str(reverse_init_flows) + except: + print("[ERROR]init the forward and reverse flow failed.") + + if suite == "throughput": + print("[INFO]!!!!!!!!!!!!!!!Now begin to throughput test") + ret, result = self.send_instace.run_rfc2544_throughput(forward_init_flows, reverse_init_flows) + elif suite == "frameloss": + print("[INFO]!!!!!!!!!!!1!!!Now begin to frameloss test") + ret, result = self.send_instace.run_rfc2544_frameloss(forward_init_flows, reverse_init_flows) + return ret, result diff --git a/vstf/vstf/controller/spirent/common/result_analysis.py b/vstf/vstf/controller/spirent/common/result_analysis.py new file mode 100755 index 00000000..162e3888 --- /dev/null +++ b/vstf/vstf/controller/spirent/common/result_analysis.py @@ -0,0 +1,172 @@ +#!/usr/bin/python + +import re + + +def getResultColumn(data_dict): + column_string = data_dict['Columns'] + return column_string.strip('{}').split() + + +def getResult(data_dict): + result_string = data_dict['Output'] + result_array = result_string.split('} {') + result = [] + for line in result_array: + result.append(line.split()) + return result + + +def restrucData(data_string): + try: + data_dict = {} + p = re.compile('-Columns.*-Output') + data_dict['Columns'] = p.findall(data_string)[0].strip('-Columns {} -Output') + p = re.compile('-Output.*-State') + data_dict['Output'] = p.findall(data_string)[0].strip('-Output {} -State') + if data_dict['Columns'] is not None or data_dict['Output'] is not None: + return False, None + return True, data_dict + except: + print("[ERROR]Find the column name or the output result failed.") + + +def framelossData(column, perfdata): + column_name_dict = { + 'TrialNumber': 0, + 'Id': 1, + 'FrameSize': 3, + 'TxFrameCount': 9, + 'RxFrameCount': 10, + 'PercentLoss(%s)': 12, + 'MinimumLatency(us)': 17, + 'MaximumLatency(us)': 18, + 'AverageLatency(us)': 19, + 'MinimumJitter(us)': 20, + 'MaximumJitter(us)': 21, + 'AverageJitter(us)': 22, + } + # get the column array + column_array = [ + column[column_name_dict['FrameSize']], + 'ForwardingRate(Mpps)', + column[column_name_dict['TxFrameCount']], + column[column_name_dict['RxFrameCount']], + column[column_name_dict['PercentLoss(%s)']], + column[column_name_dict['AverageLatency(us)']], + column[column_name_dict['MinimumLatency(us)']], + column[column_name_dict['MaximumLatency(us)']], + column[column_name_dict['AverageJitter(us)']], + column[column_name_dict['MinimumJitter(us)']], + column[column_name_dict['MaximumJitter(us)']] + ] + data_array = [] + for line in perfdata: + line_options = [ + # line[column_name_dict['TrialNumber']], + # line[column_name_dict['Id']], + line[column_name_dict['FrameSize']], + str(float(line[column_name_dict['RxFrameCount']]) / 60 / 1000000), + line[column_name_dict['TxFrameCount']], + line[column_name_dict['RxFrameCount']], + line[column_name_dict['PercentLoss(%s)']], + line[column_name_dict['AverageLatency(us)']], + line[column_name_dict['MinimumLatency(us)']], + line[column_name_dict['MaximumLatency(us)']], + line[column_name_dict['AverageJitter(us)']], + line[column_name_dict['MinimumJitter(us)']], + line[column_name_dict['MaximumJitter(us)']] + ] + data_array.append(line_options) + return [column_array, data_array] + + +class analysis(object): + def __init__(self): + pass + + def analyseResult(self, suite, column, perfdata): + """ + :type self: object + """ + global data_array, column_array + if suite == 'throughput': + [column_array, data_array] = self.throughputData(column, perfdata) + elif suite == 'frameloss': + [column_array, data_array] = self.framelossData(column, perfdata) + elif suite == 'latency': + self.latencyData(column, perfdata) + else: + return None + for line in data_array: + print line + return [column_array, data_array] + + def throughputData(self, column, perfdata): + column_name_dict = { + 'TrialNumber': 0, + 'Id': 1, + 'FrameSize': 3, + 'Load(%)': 6, + 'Result': 8, + 'TxFrameCount': 12, + 'RxFrameCount': 13, + 'ForwardingRate(mpps)': 17, + 'MinimumLatency(us)': 18, + 'MaximumLatency(us)': 19, + 'AverageLatency(us)': 20, + 'MinimumJitter(us)': 21, + 'MaximumJitter(us)': 22, + 'AverageJitter(us)': 23 + } + column_array = {column[column_name_dict['FrameSize']], + column[column_name_dict['Load(%)']], + column[column_name_dict['Result']], + 'ForwardingRate(mpps)', + column[column_name_dict['TxFrameCount']], + column[column_name_dict['RxFrameCount']], + column[column_name_dict['AverageLatency(us)']], + column[column_name_dict['MinimumLatency(us)']], + column[column_name_dict['MaximumLatency(us)']], + column[column_name_dict['AverageJitter(us)']], + column[column_name_dict['MinimumJitter(us)']], + column[column_name_dict['MaximumJitter(us)']]} + data_array = [] + for line in perfdata: + if line[column_name_dict['Result']] == 'Passed': + line_options = [ + # line[column_name_dict['TrialNumber']], + # line[column_name_dict['Id']], + line[column_name_dict['FrameSize']], + line[column_name_dict['Load(%)']], + line[column_name_dict['Result']], + str(float(line[column_name_dict['ForwardingRate(mpps)']]) / 1000000), + line[column_name_dict['TxFrameCount']], + line[column_name_dict['RxFrameCount']], + line[column_name_dict['AverageLatency(us)']], + line[column_name_dict['MinimumLatency(us)']], + line[column_name_dict['MaximumLatency(us)']], + line[column_name_dict['AverageJitter(us)']], + line[column_name_dict['MinimumJitter(us)']], + line[column_name_dict['MaximumJitter(us)']]] + else: + continue + data_array.append(line_options) + # delete the redundant test data + delete_index = [] + new_data_array = [] + for ele in range(len(data_array) - 1): + if data_array[ele][0] == data_array[ele + 1][0]: + delete_index.append(ele) + + for num in len(data_array): + if num not in delete_index: + new_data_array.append(data_array[num]) + + return column_array, new_data_array + + def latencyData(self, column, perfdata): + pass + + +analysis_instance = analysis() -- cgit 1.2.3-korg