summaryrefslogtreecommitdiffstats
path: root/testcases/testcase.py
diff options
context:
space:
mode:
authorMartin Klozik <martinx.klozik@intel.com>2017-11-15 08:24:29 +0000
committerGerrit Code Review <gerrit@opnfv.org>2017-11-15 08:24:29 +0000
commit66a2773d89c689d1b8740aa2388164582e9ccb6c (patch)
tree6d427256960f314d8ac7152f7497f3f6ea033811 /testcases/testcase.py
parent31770a64cd8a5c40ee3657ac97e87a900f7aeca5 (diff)
parentb1534957e463b5e34957a8d48ce5c6b0552ffbb4 (diff)
Merge "teststeps: Improvements and bugfixing of teststeps"
Diffstat (limited to 'testcases/testcase.py')
-rw-r--r--testcases/testcase.py114
1 files changed, 74 insertions, 40 deletions
diff --git a/testcases/testcase.py b/testcases/testcase.py
index 3d9ffe4b..37cdefa6 100644
--- a/testcases/testcase.py
+++ b/testcases/testcase.py
@@ -68,7 +68,6 @@ class TestCase(object):
self._loadgen = None
self._output_file = None
self._tc_results = None
- self._settings_original = {}
self._settings_paths_modified = False
self._testcast_run_time = None
self._versions = []
@@ -76,21 +75,18 @@ class TestCase(object):
self._step_check = False # by default don't check result for step driven testcases
self._step_vnf_list = {}
self._step_result = []
+ self._step_result_mapping = {}
self._step_status = None
+ self._step_send_traffic = False # indication if send_traffic was called within test steps
self._testcase_run_time = None
- # store all GUEST_ specific settings to keep original values before their expansion
- for key in S.__dict__:
- if key.startswith('GUEST_'):
- self._settings_original[key] = S.getValue(key)
-
- self._update_settings('VSWITCH', cfg.get('vSwitch', S.getValue('VSWITCH')))
- self._update_settings('VNF', cfg.get('VNF', S.getValue('VNF')))
- self._update_settings('TRAFFICGEN', cfg.get('Trafficgen', S.getValue('TRAFFICGEN')))
+ S.setValue('VSWITCH', cfg.get('vSwitch', S.getValue('VSWITCH')))
+ S.setValue('VNF', cfg.get('VNF', S.getValue('VNF')))
+ S.setValue('TRAFFICGEN', cfg.get('Trafficgen', S.getValue('TRAFFICGEN')))
test_params = copy.deepcopy(S.getValue('TEST_PARAMS'))
tc_test_params = cfg.get('Parameters', S.getValue('TEST_PARAMS'))
test_params = merge_spec(test_params, tc_test_params)
- self._update_settings('TEST_PARAMS', test_params)
+ S.setValue('TEST_PARAMS', test_params)
S.check_test_params()
# override all redefined GUEST_ values to have them expanded correctly
@@ -109,6 +105,15 @@ class TestCase(object):
self.desc = cfg.get('Description', 'No description given.')
self.test = cfg.get('TestSteps', None)
+ # log testcase name and details
+ tmp_desc = functions.format_description(self.desc, 50)
+ self._logger.info('############################################################')
+ self._logger.info('# Test: %s', self.name)
+ self._logger.info('# Details: %s', tmp_desc[0])
+ for i in range(1, len(tmp_desc)):
+ self._logger.info('# %s', tmp_desc[i])
+ self._logger.info('############################################################')
+
bidirectional = S.getValue('TRAFFIC')['bidir']
if not isinstance(S.getValue('TRAFFIC')['bidir'], str):
raise TypeError(
@@ -162,6 +167,7 @@ class TestCase(object):
self._traffic['l2'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L2')
self._traffic['l3'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L3')
self._traffic['l4'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L4')
+ self._traffic['l2']['dstmac'] = S.getValue('NICS')[1]['mac']
elif len(S.getValue('NICS')) and \
(S.getValue('NICS')[0]['type'] == 'vf' or
S.getValue('NICS')[1]['type'] == 'vf'):
@@ -181,8 +187,6 @@ class TestCase(object):
def run_initialize(self):
""" Prepare test execution environment
"""
- self._logger.debug(self.name)
-
# mount hugepages if needed
self._mount_hugepages()
@@ -338,7 +342,7 @@ class TestCase(object):
# ...and continue with traffic generation, but keep
# in mind, that clean deployment does not configure
# OVS nor executes the traffic
- if self.deployment != 'clean':
+ if self.deployment != 'clean' and not self._step_send_traffic:
self._traffic_ctl.send_traffic(self._traffic)
# dump vswitch flows before they are affected by VNF termination
@@ -360,23 +364,6 @@ class TestCase(object):
# report test results
self.run_report()
- # restore original settings
- for key in self._settings_original:
- S.setValue(key, self._settings_original[key])
-
- def _update_settings(self, param, value):
- """ Check value of given configuration parameter
- In case that new value is different, then testcase
- specific settings is updated and original value stored
-
- :param param: Name of parameter inside settings
- :param value: Disired parameter value
- """
- orig_value = S.getValue(param)
- if orig_value != value:
- self._settings_original[param] = copy.deepcopy(orig_value)
- S.setValue(param, value)
-
def _append_results(self, results):
"""
Method appends mandatory Test Case results to list of dictionaries.
@@ -684,18 +671,22 @@ class TestCase(object):
if self._step_vnf_list[vnf]:
self._step_vnf_list[vnf].stop()
- def step_eval_param(self, param, STEP):
- # pylint: disable=invalid-name
+ def step_eval_param(self, param, step_result):
""" Helper function for #STEP macro evaluation
"""
if isinstance(param, str):
# evaluate every #STEP reference inside parameter itself
- macros = re.findall(r'#STEP\[[\w\[\]\-\'\"]+\]', param)
+ macros = re.findall(r'(#STEP\[([\w\-:]+)\]((\[[\w\-\'\"]+\])*))', param)
+
if macros:
for macro in macros:
+ if macro[1] in self._step_result_mapping:
+ key = self._step_result_mapping[macro[1]]
+ else:
+ key = macro[1]
# pylint: disable=eval-used
- tmp_val = str(eval(macro[1:]))
- param = param.replace(macro, tmp_val)
+ tmp_val = str(eval('step_result[{}]{}'.format(key, macro[2])))
+ param = param.replace(macro[0], tmp_val)
# evaluate references to vsperf configuration options
macros = re.findall(r'\$(([\w\-]+)(\[[\w\[\]\-\'\"]+\])*)', param)
@@ -713,12 +704,12 @@ class TestCase(object):
elif isinstance(param, list) or isinstance(param, tuple):
tmp_list = []
for item in param:
- tmp_list.append(self.step_eval_param(item, STEP))
+ tmp_list.append(self.step_eval_param(item, step_result))
return tmp_list
elif isinstance(param, dict):
tmp_dict = {}
for (key, value) in param.items():
- tmp_dict[key] = self.step_eval_param(value, STEP)
+ tmp_dict[key] = self.step_eval_param(value, step_result)
return tmp_dict
else:
return param
@@ -732,6 +723,7 @@ class TestCase(object):
eval_params.append(self.step_eval_param(param, step_result))
return eval_params
+ # pylint: disable=too-many-locals, too-many-branches, too-many-statements
def step_run(self):
""" Execute actions specified by TestSteps list
@@ -753,6 +745,30 @@ class TestCase(object):
# run test step by step...
for i, step in enumerate(self.test):
step_ok = not self._step_check
+ step_check = self._step_check
+ regex = None
+ # configure step result mapping if step alias/label is detected
+ if step[0].startswith('#'):
+ key = step[0][1:]
+ if key.isdigit():
+ raise RuntimeError('Step alias can\'t be an integer value {}'.format(key))
+ if key in self._step_result_mapping:
+ raise RuntimeError('Step alias {} has been used already for step '
+ '{}'.format(key, self._step_result_mapping[key]))
+ self._step_result_mapping[step[0][1:]] = i
+ step = step[1:]
+
+ # store regex filter if it is specified
+ if isinstance(step[-1], str) and step[-1].startswith('|'):
+ # evalute macros and variables used in regex
+ regex = self.step_eval_params([step[-1][1:]], self._step_result[:i])[0]
+ step = step[:-1]
+
+ # check if step verification should be suppressed
+ if step[0].startswith('!'):
+ step_check = False
+ step_ok = True
+ step[0] = step[0][1:]
if step[0] == 'vswitch':
test_object = self._vswitch_ctl.get_vswitch()
elif step[0] == 'namespace':
@@ -772,6 +788,9 @@ class TestCase(object):
tmp_traffic = copy.deepcopy(self._traffic)
tmp_traffic.update(step[2])
step[2] = tmp_traffic
+ # store indication that traffic has been sent
+ # so it is not sent again after the execution of teststeps
+ self._step_send_traffic = True
elif step[0].startswith('vnf'):
if not self._step_vnf_list[step[0]]:
# initialize new VM
@@ -785,6 +804,15 @@ class TestCase(object):
self._logger.debug("Sleep %s seconds", step[1])
time.sleep(int(step[1]))
continue
+ elif step[0] == 'log':
+ test_object = self._logger
+ # there isn't a need for validation of log entry
+ step_check = False
+ step_ok = True
+ elif step[0] == 'pdb':
+ import pdb
+ pdb.set_trace()
+ continue
else:
self._logger.error("Unsupported test object %s", step[0])
self._step_status = {'status' : False, 'details' : ' '.join(step)}
@@ -793,7 +821,7 @@ class TestCase(object):
return False
test_method = getattr(test_object, step[1])
- if self._step_check:
+ if step_check:
test_method_check = getattr(test_object, CHECK_PREFIX + step[1])
else:
test_method_check = None
@@ -804,18 +832,24 @@ class TestCase(object):
# to support negative indexes
step_params = self.step_eval_params(step[2:], self._step_result[:i])
step_log = '{} {}'.format(' '.join(step[:2]), step_params)
+ step_log += ' filter "{}"'.format(regex) if regex else ''
self._logger.debug("Step %s '%s' start", i, step_log)
self._step_result[i] = test_method(*step_params)
+ if regex:
+ # apply regex to step output
+ self._step_result[i] = functions.filter_output(
+ self._step_result[i], regex)
+
self._logger.debug("Step %s '%s' results '%s'", i,
step_log, self._step_result[i])
time.sleep(S.getValue('TEST_STEP_DELAY'))
- if self._step_check:
+ if step_check:
step_ok = test_method_check(self._step_result[i], *step_params)
except (AssertionError, AttributeError, IndexError) as ex:
step_ok = False
self._logger.error("Step %s raised %s", i, type(ex).__name__)
- if self._step_check:
+ if step_check:
self.step_report_status("Step {} - '{}'".format(i, step_log), step_ok)
if not step_ok: