summaryrefslogtreecommitdiffstats
path: root/func
diff options
context:
space:
mode:
authorzhifeng.jiang <jiang.zhifeng@zte.com.cn>2016-07-03 23:04:40 +0800
committerzhifeng.jiang <jiang.zhifeng@zte.com.cn>2016-07-05 20:05:26 +0800
commit4c7387f811c01bd74b5ae6e0d0cca4bc98d298e8 (patch)
tree3ca8f61a152b196ca65ef526ae9659c0bddbb411 /func
parent1560721b960531785a29d70a572f0ee791234fb0 (diff)
Add some UT test cases and fix pep8 errors for cli and env_setup.
JIRA:QTIP-89 Change-Id: I4a46898071001f679f1a032a560d605dffc8eb9f Signed-off-by: zhifeng.jiang <jiang.zhifeng@zte.com.cn>
Diffstat (limited to 'func')
-rw-r--r--func/cli.py121
-rw-r--r--func/env_setup.py132
2 files changed, 126 insertions, 127 deletions
diff --git a/func/cli.py b/func/cli.py
index 129ab96c..5e8f02cf 100644
--- a/func/cli.py
+++ b/func/cli.py
@@ -15,94 +15,95 @@ from func.spawn_vm import SpawnVM
import argparse
-class cli():
-
- def _getfile(self, filepath):
-
- with open('test_list/'+filepath,'r') as finput:
- _benchmarks=finput.readlines()
- for items in range( len(_benchmarks)):
- _benchmarks[items]=_benchmarks[items].rstrip()
+class cli:
+
+ @staticmethod
+ def _getfile(file_path):
+ with open('test_list/' + file_path, 'r') as fin_put:
+ _benchmarks = fin_put.readlines()
+ for items in range(len(_benchmarks)):
+ _benchmarks[items] = _benchmarks[items].rstrip()
return _benchmarks
- def _getsuite(self, filepath):
+ @staticmethod
+ def _getsuite(file_path):
- return filepath
+ return file_path
- def _checkTestList(self, filename):
+ @staticmethod
+ def _check_test_list(filename):
- if os.path.isfile('test_list/'+filename):
+ if os.path.isfile('test_list/' + filename):
return True
else:
return False
- def _checkLabName(self, labname):
+ @staticmethod
+ def _check_lab_name(lab_name):
- if os.path.isdir('test_cases/'+labname):
+ if os.path.isdir('test_cases/' + lab_name):
return True
else:
return False
- def _get_fname(self,file_name):
+ @staticmethod
+ def _get_f_name(file_name):
return file_name[0: file_name.find('.')]
- def __init__(self):
-
- suite=[]
+ @staticmethod
+ def _parse_args(args):
parser = argparse.ArgumentParser()
- parser.add_argument('-l ', '--lab', help='Name of Lab on which being tested, These can' \
- 'be found in the test_cases/ directory. Please ' \
- 'ensure that you have edited the respective files '\
- 'before using them. For testing other than through Jenkins'\
- ' The user should list default after -l . all the fields in'\
- ' the files are necessary and should be filled')
- parser.add_argument('-f', '--file', help = 'File in test_list with the list of tests. there are three files' \
- '\n compute '\
- '\n storage '\
- '\n network '\
- 'They contain all the tests that will be run. They are listed by suite.' \
- 'Please ensure there are no empty lines')
- args = parser.parse_args()
-
- if not self._checkTestList(args.file):
+ parser.add_argument('-l ', '--lab', help='Name of Lab on which being tested, These can'
+ 'be found in the test_cases/ directory. Please '
+ 'ensure that you have edited the respective files '
+ 'before using them. For testing other than through Jenkins'
+ ' The user should list default after -l . all the fields in'
+ ' the files are necessary and should be filled')
+ parser.add_argument('-f', '--file', help='File in test_list with the list of tests. there are three files'
+ '\n compute '
+ '\n storage '
+ '\n network '
+ 'They contain all the tests that will be run. They are listed by suite.'
+ 'Please ensure there are no empty lines')
+ return parser.parse_args(args)
+
+ def __init__(self, args=sys.argv[1:]):
+
+ suite = []
+ args = self._parse_args(args)
+
+ if not self._check_test_list(args.file):
print '\n\n ERROR: Test File Does not exist in test_list/ please enter correct file \n\n'
sys.exit(0)
- if not self._checkLabName(args.lab):
- print '\n\n You have specified a lab that is not present in test_cases/ please enter correct'\
- ' file. If unsure how to proceed, use -l default.\n\n'
+ if not self._check_lab_name(args.lab):
+ print '\n\n You have specified a lab that is not present in test_cases/ please enter correct \
+ file. If unsure how to proceed, use -l default.\n\n'
sys.exit(0)
benchmarks = self._getfile(args.file)
suite.append(args.file)
- suite=self._getsuite(suite)
- for items in range (len(benchmarks)):
- if (suite and benchmarks):
-
- roles=''
- vm_info=''
- benchmark_details=''
- pip=''
- obj=''
+ suite = self._getsuite(suite)
+ for items in range(len(benchmarks)):
+ if suite and benchmarks:
obj = Env_setup()
- if os.path.isfile('./test_cases/'+args.lab.lower()+'/'+suite[0]+'/' +benchmarks[items]):
- [benchmark, roles, vm_info, benchmark_details, pip, proxy_info] = obj.parse('./test_cases/'
- +args.lab.lower()+'/'+suite[0]+'/'+benchmarks[items])
+ if os.path.isfile('./test_cases/' + args.lab.lower() + '/' + suite[0] + '/' + benchmarks[items]):
+ [benchmark, vm_info, benchmark_details, proxy_info] = \
+ obj.parse('./test_cases/' + args.lab.lower() + '/' + suite[0] + '/' + benchmarks[items])
if len(vm_info) != 0:
- vmObj =''
- vmObj = SpawnVM(vm_info)
- if obj.callpingtest():
- obj.callsshtest()
- obj.updateAnsible()
- dvr = Driver()
- dvr.drive_bench(benchmark,
- obj.roles_dict.items(),
- self._get_fname(benchmarks[items]),
- benchmark_details,
- obj.ip_pw_dict.items(),
- proxy_info)
+ SpawnVM(vm_info)
+ obj.call_ping_test()
+ obj.call_ssh_test()
+ obj.update_ansible()
+ dvr = Driver()
+ dvr.drive_bench(benchmark,
+ obj.roles_dict.items(),
+ self._get_f_name(benchmarks[items]),
+ benchmark_details,
+ obj.ip_pw_dict.items(),
+ proxy_info)
else:
print (benchmarks[items], ' is not a Template in the Directory - \
Enter a Valid file name. or use qtip.py -h for list')
diff --git a/func/env_setup.py b/func/env_setup.py
index c1e2a003..9c0dadb3 100644
--- a/func/env_setup.py
+++ b/func/env_setup.py
@@ -13,19 +13,22 @@ from collections import defaultdict
import yaml
import time
import paramiko
-class Env_setup():
+import socket
+
+
+class Env_setup:
roles_ip_list = [] # ROLE and its corresponding IP address list
ip_pw_list = [] # IP and password, this will be used to ssh
roles_dict = defaultdict(list)
ip_pw_dict = defaultdict(list)
ip_pip_list = []
vm_parameters = defaultdict(list)
- benchmark_details= defaultdict()
+ benchmark_details = defaultdict()
benchmark = ''
def __init__(self):
print '\nParsing class initiated\n'
- self.roles_ip_list[:]=[]
+ self.roles_ip_list[:] = []
self.ip_pw_list[:] = []
self.roles_dict.clear()
self.ip_pw_dict.clear()
@@ -35,41 +38,44 @@ class Env_setup():
self.benchmark_details.clear()
self.benchmark = ''
- def writeTofile(self, role):
- fname2 = open('./data/hosts', 'w')
+ @staticmethod
+ def write_to_file(role):
+ f_name_2 = open('./data/hosts', 'w')
print role.items()
for k in role:
- fname2.write('[' + k + ']\n')
+ f_name_2.write('[' + k + ']\n')
num = len(role[k])
for x in range(num):
- fname2.write(role[k][x] + '\n')
- fname2.close
+ f_name_2.write(role[k][x] + '\n')
+ f_name_2.close()
- def sshtest(self, lister):
- print 'list: ',lister
+ @staticmethod
+ def ssh_test(lister):
+ print 'list: ', lister
for k, v in lister:
- ipvar = k
- pwvar = v
+ ip_var = k
print '\nBeginning SSH Test!\n'
if v != '':
- print ('\nSSH->>>>> {0} {1}\n'.format(k,v))
+ print ('\nSSH->>>>> {0} {1}\n'.format(k, v))
time.sleep(2)
ssh_c = 'ssh-keyscan {0} >> ~/.ssh/known_hosts'.format(k)
os.system(ssh_c)
- ssh_cmd = './data/qtip_creds.sh {0}'.format(ipvar)
+ ssh_cmd = './data/qtip_creds.sh {0}'.format(ip_var)
print ssh_cmd
- res = os.system(ssh_cmd)
+ os.system(ssh_cmd)
for infinity in range(100):
- try :
+ try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.connect(k , key_filename= './data/QtipKey')
+ ssh.connect(k, key_filename='./data/QtipKey')
stdin, stdout, stderr = ssh.exec_command('ls')
print('SSH successful')
+ for line in stdout:
+ print '... ' + line.strip('\n')
break
- except:
- print 'Retrying aSSH'
+ except socket.error:
+ print 'Retrying aSSH %s' % infinity
time.sleep(1)
if v == '':
print ('SSH->>>>>', k)
@@ -79,99 +85,91 @@ class Env_setup():
os.system(ssh_c)
for infinity in range(10):
- try :
+ try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.connect(k, key_filename= './data/QtipKey')
+ ssh.connect(k, key_filename='./data/QtipKey')
stdin, stdout, stderr = ssh.exec_command('ls')
+ print('SSH successful')
+ for line in stdout:
+ print '... ' + line.strip('\n')
break
- except:
- print 'Retrying SSH'
+ except socket.error:
+ print 'Retrying SSH %s' % infinity
+
+ @staticmethod
+ def ping_test(lister):
- def pingtest(self, lister):
- pingFlag = 0
- result = True
for k, v in lister.iteritems():
time.sleep(10)
for val in v:
ipvar = val
ping_cmd = 'ping -D -c1 {0}'.format(ipvar)
- while (os.system(ping_cmd) != 0) &(pingFlag <=20):
+ while os.system(ping_cmd) != 0:
print '\nWaiting for machine\n'
time.sleep(10)
- pingFlag = pingFlag+1
- if pingFlag <= 2:
- print ('\n\n %s is UP \n\n ' % ipvar)
- else:
- result = False
- return result
-
+ print ('\n\n %s is UP \n\n ' % ipvar)
- def GetHostMachineinfo(self, Hosttag):
+ def get_host_machine_info(self, host_tag):
- num = len(Hosttag)
+ num = len(host_tag)
offset = len(self.roles_ip_list)
for x in range(num):
hostlabel = 'machine_' + str(x + 1)
self.roles_ip_list.insert(
- offset, (Hosttag[hostlabel]['role'], Hosttag[hostlabel]['ip']))
+ offset, (host_tag[hostlabel]['role'], host_tag[hostlabel]['ip']))
self.ip_pw_list.insert(
- offset, (Hosttag[hostlabel]['ip'], Hosttag[hostlabel]['pw']))
+ offset, (host_tag[hostlabel]['ip'], host_tag[hostlabel]['pw']))
- def GetVirtualMachineinfo(self, Virtualtag):
+ def get_virtual_machine_info(self, virtual_tag):
- num = len(Virtualtag)
+ num = len(virtual_tag)
for x in range(num):
- hostlabel = 'virtualmachine_' + str(x + 1)
- for k, v in Virtualtag[hostlabel].iteritems():
+ host_label = 'virtualmachine_' + str(x + 1)
+ for k, v in virtual_tag[host_label].iteritems():
self.vm_parameters[k].append(v)
- def GetBenchmarkDetails(self, detail_dic):
+ def get_bench_mark_details(self, detail_dic):
print detail_dic
- for k,v in detail_dic.items():
- self.benchmark_details[k]= v
+ for k, v in detail_dic.items():
+ self.benchmark_details[k] = v
- def parse(self, configfilepath):
+ def parse(self, config_file_path):
try:
- fname = open(configfilepath, 'r+')
- doc = yaml.load(fname)
-# valid_file = validate_yaml.Validate_Yaml(doc)
- fname.close()
- for scenario in doc:
+ f_name = open(config_file_path, 'r+')
+ doc = yaml.load(f_name)
+ f_name.close()
+ if doc['Scenario']['benchmark']:
self.benchmark = doc['Scenario']['benchmark']
if doc['Context']['Virtual_Machines']:
- self.GetVirtualMachineinfo(doc['Context']['Virtual_Machines'])
+ self.get_virtual_machine_info(doc['Context']['Virtual_Machines'])
if doc['Context']['Host_Machines']:
- self.GetHostMachineinfo(doc['Context']['Host_Machines'])
- if doc.get('Scenario',{}).get('benchmark_details',{}):
- self.GetBenchmarkDetails(doc.get('Scenario',{}).get('benchmark_details',{}))
- if 'Proxy_Environment' in doc['Context'].keys():
+ self.get_host_machine_info(doc['Context']['Host_Machines'])
+ if doc.get('Scenario', {}).get('benchmark_details', {}):
+ self.get_bench_mark_details(doc.get('Scenario', {}).get('benchmark_details', {}))
+ if 'Proxy_Environment' in doc['Context'].keys():
self.proxy_info['http_proxy'] = doc['Context']['Proxy_Environment']['http_proxy']
self.proxy_info['https_proxy'] = doc['Context']['Proxy_Environment']['https_proxy']
- self.proxy_info['no_proxy'] = doc['Context']['Proxy_Environment']['no_proxy']
+ self.proxy_info['no_proxy'] = doc['Context']['Proxy_Environment']['no_proxy']
for k, v in self.roles_ip_list:
self.roles_dict[k].append(v)
for k, v in self.ip_pw_list:
self.ip_pw_dict[k].append(v)
return (
self.benchmark,
- self.roles_dict.items(),
self.vm_parameters,
self.benchmark_details.items(),
- self.ip_pw_dict.items(),
self.proxy_info)
-
except KeyboardInterrupt:
- fname.close()
print 'ConfigFile Closed: exiting!'
sys.exit(0)
- def updateAnsible(self):
- self.writeTofile(self.roles_dict)
+ def update_ansible(self):
+ self.write_to_file(self.roles_dict)
- def callpingtest(self):
- self.pingtest(self.roles_dict)
+ def call_ping_test(self):
+ self.ping_test(self.roles_dict)
- def callsshtest(self):
- self.sshtest(self.ip_pw_list)
+ def call_ssh_test(self):
+ self.ssh_test(self.ip_pw_list)