diff options
author | zhifeng.jiang <jiang.zhifeng@zte.com.cn> | 2016-07-03 23:04:40 +0800 |
---|---|---|
committer | zhifeng.jiang <jiang.zhifeng@zte.com.cn> | 2016-07-05 20:05:26 +0800 |
commit | 4c7387f811c01bd74b5ae6e0d0cca4bc98d298e8 (patch) | |
tree | 3ca8f61a152b196ca65ef526ae9659c0bddbb411 /func | |
parent | 1560721b960531785a29d70a572f0ee791234fb0 (diff) |
Add some UT test cases and fix pep8 errors for cli and env_setup.
JIRA:QTIP-89
Change-Id: I4a46898071001f679f1a032a560d605dffc8eb9f
Signed-off-by: zhifeng.jiang <jiang.zhifeng@zte.com.cn>
Diffstat (limited to 'func')
-rw-r--r-- | func/cli.py | 121 | ||||
-rw-r--r-- | func/env_setup.py | 132 |
2 files changed, 126 insertions, 127 deletions
diff --git a/func/cli.py b/func/cli.py index 129ab96c..5e8f02cf 100644 --- a/func/cli.py +++ b/func/cli.py @@ -15,94 +15,95 @@ from func.spawn_vm import SpawnVM import argparse -class cli(): - - def _getfile(self, filepath): - - with open('test_list/'+filepath,'r') as finput: - _benchmarks=finput.readlines() - for items in range( len(_benchmarks)): - _benchmarks[items]=_benchmarks[items].rstrip() +class cli: + + @staticmethod + def _getfile(file_path): + with open('test_list/' + file_path, 'r') as fin_put: + _benchmarks = fin_put.readlines() + for items in range(len(_benchmarks)): + _benchmarks[items] = _benchmarks[items].rstrip() return _benchmarks - def _getsuite(self, filepath): + @staticmethod + def _getsuite(file_path): - return filepath + return file_path - def _checkTestList(self, filename): + @staticmethod + def _check_test_list(filename): - if os.path.isfile('test_list/'+filename): + if os.path.isfile('test_list/' + filename): return True else: return False - def _checkLabName(self, labname): + @staticmethod + def _check_lab_name(lab_name): - if os.path.isdir('test_cases/'+labname): + if os.path.isdir('test_cases/' + lab_name): return True else: return False - def _get_fname(self,file_name): + @staticmethod + def _get_f_name(file_name): return file_name[0: file_name.find('.')] - def __init__(self): - - suite=[] + @staticmethod + def _parse_args(args): parser = argparse.ArgumentParser() - parser.add_argument('-l ', '--lab', help='Name of Lab on which being tested, These can' \ - 'be found in the test_cases/ directory. Please ' \ - 'ensure that you have edited the respective files '\ - 'before using them. For testing other than through Jenkins'\ - ' The user should list default after -l . all the fields in'\ - ' the files are necessary and should be filled') - parser.add_argument('-f', '--file', help = 'File in test_list with the list of tests. there are three files' \ - '\n compute '\ - '\n storage '\ - '\n network '\ - 'They contain all the tests that will be run. They are listed by suite.' \ - 'Please ensure there are no empty lines') - args = parser.parse_args() - - if not self._checkTestList(args.file): + parser.add_argument('-l ', '--lab', help='Name of Lab on which being tested, These can' + 'be found in the test_cases/ directory. Please ' + 'ensure that you have edited the respective files ' + 'before using them. For testing other than through Jenkins' + ' The user should list default after -l . all the fields in' + ' the files are necessary and should be filled') + parser.add_argument('-f', '--file', help='File in test_list with the list of tests. there are three files' + '\n compute ' + '\n storage ' + '\n network ' + 'They contain all the tests that will be run. They are listed by suite.' + 'Please ensure there are no empty lines') + return parser.parse_args(args) + + def __init__(self, args=sys.argv[1:]): + + suite = [] + args = self._parse_args(args) + + if not self._check_test_list(args.file): print '\n\n ERROR: Test File Does not exist in test_list/ please enter correct file \n\n' sys.exit(0) - if not self._checkLabName(args.lab): - print '\n\n You have specified a lab that is not present in test_cases/ please enter correct'\ - ' file. If unsure how to proceed, use -l default.\n\n' + if not self._check_lab_name(args.lab): + print '\n\n You have specified a lab that is not present in test_cases/ please enter correct \ + file. If unsure how to proceed, use -l default.\n\n' sys.exit(0) benchmarks = self._getfile(args.file) suite.append(args.file) - suite=self._getsuite(suite) - for items in range (len(benchmarks)): - if (suite and benchmarks): - - roles='' - vm_info='' - benchmark_details='' - pip='' - obj='' + suite = self._getsuite(suite) + for items in range(len(benchmarks)): + if suite and benchmarks: obj = Env_setup() - if os.path.isfile('./test_cases/'+args.lab.lower()+'/'+suite[0]+'/' +benchmarks[items]): - [benchmark, roles, vm_info, benchmark_details, pip, proxy_info] = obj.parse('./test_cases/' - +args.lab.lower()+'/'+suite[0]+'/'+benchmarks[items]) + if os.path.isfile('./test_cases/' + args.lab.lower() + '/' + suite[0] + '/' + benchmarks[items]): + [benchmark, vm_info, benchmark_details, proxy_info] = \ + obj.parse('./test_cases/' + args.lab.lower() + '/' + suite[0] + '/' + benchmarks[items]) if len(vm_info) != 0: - vmObj ='' - vmObj = SpawnVM(vm_info) - if obj.callpingtest(): - obj.callsshtest() - obj.updateAnsible() - dvr = Driver() - dvr.drive_bench(benchmark, - obj.roles_dict.items(), - self._get_fname(benchmarks[items]), - benchmark_details, - obj.ip_pw_dict.items(), - proxy_info) + SpawnVM(vm_info) + obj.call_ping_test() + obj.call_ssh_test() + obj.update_ansible() + dvr = Driver() + dvr.drive_bench(benchmark, + obj.roles_dict.items(), + self._get_f_name(benchmarks[items]), + benchmark_details, + obj.ip_pw_dict.items(), + proxy_info) else: print (benchmarks[items], ' is not a Template in the Directory - \ Enter a Valid file name. or use qtip.py -h for list') diff --git a/func/env_setup.py b/func/env_setup.py index c1e2a003..9c0dadb3 100644 --- a/func/env_setup.py +++ b/func/env_setup.py @@ -13,19 +13,22 @@ from collections import defaultdict import yaml import time import paramiko -class Env_setup(): +import socket + + +class Env_setup: roles_ip_list = [] # ROLE and its corresponding IP address list ip_pw_list = [] # IP and password, this will be used to ssh roles_dict = defaultdict(list) ip_pw_dict = defaultdict(list) ip_pip_list = [] vm_parameters = defaultdict(list) - benchmark_details= defaultdict() + benchmark_details = defaultdict() benchmark = '' def __init__(self): print '\nParsing class initiated\n' - self.roles_ip_list[:]=[] + self.roles_ip_list[:] = [] self.ip_pw_list[:] = [] self.roles_dict.clear() self.ip_pw_dict.clear() @@ -35,41 +38,44 @@ class Env_setup(): self.benchmark_details.clear() self.benchmark = '' - def writeTofile(self, role): - fname2 = open('./data/hosts', 'w') + @staticmethod + def write_to_file(role): + f_name_2 = open('./data/hosts', 'w') print role.items() for k in role: - fname2.write('[' + k + ']\n') + f_name_2.write('[' + k + ']\n') num = len(role[k]) for x in range(num): - fname2.write(role[k][x] + '\n') - fname2.close + f_name_2.write(role[k][x] + '\n') + f_name_2.close() - def sshtest(self, lister): - print 'list: ',lister + @staticmethod + def ssh_test(lister): + print 'list: ', lister for k, v in lister: - ipvar = k - pwvar = v + ip_var = k print '\nBeginning SSH Test!\n' if v != '': - print ('\nSSH->>>>> {0} {1}\n'.format(k,v)) + print ('\nSSH->>>>> {0} {1}\n'.format(k, v)) time.sleep(2) ssh_c = 'ssh-keyscan {0} >> ~/.ssh/known_hosts'.format(k) os.system(ssh_c) - ssh_cmd = './data/qtip_creds.sh {0}'.format(ipvar) + ssh_cmd = './data/qtip_creds.sh {0}'.format(ip_var) print ssh_cmd - res = os.system(ssh_cmd) + os.system(ssh_cmd) for infinity in range(100): - try : + try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(k , key_filename= './data/QtipKey') + ssh.connect(k, key_filename='./data/QtipKey') stdin, stdout, stderr = ssh.exec_command('ls') print('SSH successful') + for line in stdout: + print '... ' + line.strip('\n') break - except: - print 'Retrying aSSH' + except socket.error: + print 'Retrying aSSH %s' % infinity time.sleep(1) if v == '': print ('SSH->>>>>', k) @@ -79,99 +85,91 @@ class Env_setup(): os.system(ssh_c) for infinity in range(10): - try : + try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(k, key_filename= './data/QtipKey') + ssh.connect(k, key_filename='./data/QtipKey') stdin, stdout, stderr = ssh.exec_command('ls') + print('SSH successful') + for line in stdout: + print '... ' + line.strip('\n') break - except: - print 'Retrying SSH' + except socket.error: + print 'Retrying SSH %s' % infinity + + @staticmethod + def ping_test(lister): - def pingtest(self, lister): - pingFlag = 0 - result = True for k, v in lister.iteritems(): time.sleep(10) for val in v: ipvar = val ping_cmd = 'ping -D -c1 {0}'.format(ipvar) - while (os.system(ping_cmd) != 0) &(pingFlag <=20): + while os.system(ping_cmd) != 0: print '\nWaiting for machine\n' time.sleep(10) - pingFlag = pingFlag+1 - if pingFlag <= 2: - print ('\n\n %s is UP \n\n ' % ipvar) - else: - result = False - return result - + print ('\n\n %s is UP \n\n ' % ipvar) - def GetHostMachineinfo(self, Hosttag): + def get_host_machine_info(self, host_tag): - num = len(Hosttag) + num = len(host_tag) offset = len(self.roles_ip_list) for x in range(num): hostlabel = 'machine_' + str(x + 1) self.roles_ip_list.insert( - offset, (Hosttag[hostlabel]['role'], Hosttag[hostlabel]['ip'])) + offset, (host_tag[hostlabel]['role'], host_tag[hostlabel]['ip'])) self.ip_pw_list.insert( - offset, (Hosttag[hostlabel]['ip'], Hosttag[hostlabel]['pw'])) + offset, (host_tag[hostlabel]['ip'], host_tag[hostlabel]['pw'])) - def GetVirtualMachineinfo(self, Virtualtag): + def get_virtual_machine_info(self, virtual_tag): - num = len(Virtualtag) + num = len(virtual_tag) for x in range(num): - hostlabel = 'virtualmachine_' + str(x + 1) - for k, v in Virtualtag[hostlabel].iteritems(): + host_label = 'virtualmachine_' + str(x + 1) + for k, v in virtual_tag[host_label].iteritems(): self.vm_parameters[k].append(v) - def GetBenchmarkDetails(self, detail_dic): + def get_bench_mark_details(self, detail_dic): print detail_dic - for k,v in detail_dic.items(): - self.benchmark_details[k]= v + for k, v in detail_dic.items(): + self.benchmark_details[k] = v - def parse(self, configfilepath): + def parse(self, config_file_path): try: - fname = open(configfilepath, 'r+') - doc = yaml.load(fname) -# valid_file = validate_yaml.Validate_Yaml(doc) - fname.close() - for scenario in doc: + f_name = open(config_file_path, 'r+') + doc = yaml.load(f_name) + f_name.close() + if doc['Scenario']['benchmark']: self.benchmark = doc['Scenario']['benchmark'] if doc['Context']['Virtual_Machines']: - self.GetVirtualMachineinfo(doc['Context']['Virtual_Machines']) + self.get_virtual_machine_info(doc['Context']['Virtual_Machines']) if doc['Context']['Host_Machines']: - self.GetHostMachineinfo(doc['Context']['Host_Machines']) - if doc.get('Scenario',{}).get('benchmark_details',{}): - self.GetBenchmarkDetails(doc.get('Scenario',{}).get('benchmark_details',{})) - if 'Proxy_Environment' in doc['Context'].keys(): + self.get_host_machine_info(doc['Context']['Host_Machines']) + if doc.get('Scenario', {}).get('benchmark_details', {}): + self.get_bench_mark_details(doc.get('Scenario', {}).get('benchmark_details', {})) + if 'Proxy_Environment' in doc['Context'].keys(): self.proxy_info['http_proxy'] = doc['Context']['Proxy_Environment']['http_proxy'] self.proxy_info['https_proxy'] = doc['Context']['Proxy_Environment']['https_proxy'] - self.proxy_info['no_proxy'] = doc['Context']['Proxy_Environment']['no_proxy'] + self.proxy_info['no_proxy'] = doc['Context']['Proxy_Environment']['no_proxy'] for k, v in self.roles_ip_list: self.roles_dict[k].append(v) for k, v in self.ip_pw_list: self.ip_pw_dict[k].append(v) return ( self.benchmark, - self.roles_dict.items(), self.vm_parameters, self.benchmark_details.items(), - self.ip_pw_dict.items(), self.proxy_info) - except KeyboardInterrupt: - fname.close() print 'ConfigFile Closed: exiting!' sys.exit(0) - def updateAnsible(self): - self.writeTofile(self.roles_dict) + def update_ansible(self): + self.write_to_file(self.roles_dict) - def callpingtest(self): - self.pingtest(self.roles_dict) + def call_ping_test(self): + self.ping_test(self.roles_dict) - def callsshtest(self): - self.sshtest(self.ip_pw_list) + def call_ssh_test(self): + self.ssh_test(self.ip_pw_list) |