summaryrefslogtreecommitdiffstats
path: root/testcases/testcase.py
diff options
context:
space:
mode:
Diffstat (limited to 'testcases/testcase.py')
-rw-r--r--testcases/testcase.py260
1 files changed, 215 insertions, 45 deletions
diff --git a/testcases/testcase.py b/testcases/testcase.py
index 7f22c18f..18ad4240 100644
--- a/testcases/testcase.py
+++ b/testcases/testcase.py
@@ -32,8 +32,12 @@ from core.results.results_constants import ResultsConstants
from tools import tasks
from tools import hugepages
from tools import functions
+from tools import namespace
+from tools import veth
+from tools.teststepstools import TestStepsTools
from tools.pkt_gen.trafficgen.trafficgenhelper import TRAFFIC_DEFAULTS
+CHECK_PREFIX = 'validate_'
class TestCase(object):
"""TestCase base class
@@ -60,6 +64,12 @@ class TestCase(object):
self._settings_original = {}
self._settings_paths_modified = False
self._testcast_run_time = None
+ # initialization of step driven specific members
+ self._step_check = False # by default don't check result for step driven testcases
+ self._step_vnf_list = {}
+ self._step_result = []
+ self._step_status = None
+ self._testcase_run_time = None
# store all GUEST_ specific settings to keep original values before their expansion
for key in S.__dict__:
@@ -69,14 +79,21 @@ class TestCase(object):
self._update_settings('VSWITCH', cfg.get('vSwitch', S.getValue('VSWITCH')))
self._update_settings('VNF', cfg.get('VNF', S.getValue('VNF')))
self._update_settings('TRAFFICGEN', cfg.get('Trafficgen', S.getValue('TRAFFICGEN')))
- self._update_settings('TEST_PARAMS', cfg.get('Parameters', S.getValue('TEST_PARAMS')))
+ test_params = copy.deepcopy(S.getValue('TEST_PARAMS'))
+ tc_test_params = cfg.get('Parameters', S.getValue('TEST_PARAMS'))
+ test_params.update(tc_test_params)
+ self._update_settings('TEST_PARAMS', test_params)
+ S.check_test_params()
+
+ # override all redefined GUEST_ values to have them expanded correctly
+ tmp_test_params = copy.deepcopy(S.getValue('TEST_PARAMS'))
+ for key in tmp_test_params:
+ if key.startswith('GUEST_'):
+ S.setValue(key, S.getValue(key))
+ S.getValue('TEST_PARAMS').pop(key)
# update global settings
functions.settings_update_paths()
- guest_loopback = get_test_param('guest_loopback', None)
- if guest_loopback:
- # we can put just one item, it'll be expanded automatically for all VMs
- self._update_settings('GUEST_LOOPBACK', [guest_loopback])
# set test parameters; CLI options take precedence to testcase settings
self._logger = logging.getLogger(__name__)
@@ -162,7 +179,9 @@ class TestCase(object):
self._traffic['l2'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L2')
self._traffic['l3'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L3')
self._traffic['l4'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L4')
- elif S.getValue('NICS')[0]['type'] == 'vf' or S.getValue('NICS')[1]['type'] == 'vf':
+ elif len(S.getValue('NICS')) and \
+ (S.getValue('NICS')[0]['type'] == 'vf' or
+ S.getValue('NICS')[1]['type'] == 'vf'):
mac1 = S.getValue('NICS')[0]['mac']
mac2 = S.getValue('NICS')[1]['mac']
if mac1 and mac2:
@@ -170,6 +189,12 @@ class TestCase(object):
else:
self._logger.debug("MAC addresses can not be read")
+ # count how many VNFs are involved in TestSteps
+ if self.test:
+ for step in self.test:
+ if step[0].startswith('vnf'):
+ self._step_vnf_list[step[0]] = None
+
def run_initialize(self):
""" Prepare test execution environment
"""
@@ -186,26 +211,31 @@ class TestCase(object):
self._vnf_ctl = component_factory.create_vnf(
self.deployment,
- loader.get_vnf_class())
+ loader.get_vnf_class(),
+ len(self._step_vnf_list))
# verify enough hugepages are free to run the testcase
if not self._check_for_enough_hugepages():
raise RuntimeError('Not enough hugepages free to run test.')
# perform guest related handling
- if self._vnf_ctl.get_vnfs_number():
+ tmp_vm_count = self._vnf_ctl.get_vnfs_number() + len(self._step_vnf_list)
+ if tmp_vm_count:
# copy sources of l2 forwarding tools into VM shared dir if needed
- self._copy_fwd_tools_for_all_guests(self._vnf_ctl.get_vnfs_number())
+ self._copy_fwd_tools_for_all_guests(tmp_vm_count)
# in case of multi VM in parallel, set the number of streams to the number of VMs
if self.deployment.startswith('pvpv'):
# for each VM NIC pair we need an unique stream
streams = 0
- for vm_nic in S.getValue('GUEST_NICS_NR')[:self._vnf_ctl.get_vnfs_number()]:
+ for vm_nic in S.getValue('GUEST_NICS_NR')[:tmp_vm_count]:
streams += int(vm_nic / 2) if vm_nic > 1 else 1
self._logger.debug("VMs with parallel connection were detected. "
"Thus Number of streams was set to %s", streams)
- self._traffic.update({'multistream': streams})
+ # update streams if needed; In case of additional VNFs deployed by TestSteps
+ # user can define a proper stream count manually
+ if 'multistream' not in self._traffic or self._traffic['multistream'] < streams:
+ self._traffic.update({'multistream': streams})
# OVS Vanilla requires guest VM MAC address and IPs to work
if 'linux_bridge' in S.getValue('GUEST_LOOPBACK'):
@@ -235,11 +265,16 @@ class TestCase(object):
self._output_file = os.path.join(self._results_dir, "result_" + self.name +
"_" + self.deployment + ".csv")
+ self._step_status = {'status' : True, 'details' : ''}
+
self._logger.debug("Setup:")
def run_finalize(self):
""" Tear down test execution environment and record test results
"""
+ # Stop all VNFs started by TestSteps in case that something went wrong
+ self.step_stop_vnfs()
+
# umount hugepages if mounted
self._umount_hugepages()
@@ -248,22 +283,20 @@ class TestCase(object):
# cleanup any namespaces created
if os.path.isdir('/tmp/namespaces'):
- import tools.namespace
namespace_list = os.listdir('/tmp/namespaces')
if len(namespace_list):
self._logger.info('Cleaning up namespaces')
for name in namespace_list:
- tools.namespace.delete_namespace(name)
+ namespace.delete_namespace(name)
os.rmdir('/tmp/namespaces')
# cleanup any veth ports created
if os.path.isdir('/tmp/veth'):
- import tools.veth
veth_list = os.listdir('/tmp/veth')
if len(veth_list):
self._logger.info('Cleaning up veth ports')
for eth in veth_list:
port1, port2 = eth.split('-')
- tools.veth.del_veth_port(port1, port2)
+ veth.del_veth_port(port1, port2)
os.rmdir('/tmp/veth')
def run_report(self):
@@ -272,11 +305,12 @@ class TestCase(object):
self._logger.debug("self._collector Results:")
self._collector.print_results()
- if S.getValue('mode') != 'trafficgen-off':
+ results = self._traffic_ctl.get_results()
+ if results:
self._logger.debug("Traffic Results:")
self._traffic_ctl.print_results()
- self._tc_results = self._append_results(self._traffic_ctl.get_results())
+ self._tc_results = self._append_results(results)
TestCase.write_result_to_file(self._tc_results, self._output_file)
def run(self):
@@ -287,39 +321,35 @@ class TestCase(object):
# prepare test execution environment
self.run_initialize()
- with self._vswitch_ctl, self._loadgen:
- with self._vnf_ctl, self._collector:
- if not self._vswitch_none:
- self._add_flows()
+ try:
+ with self._vswitch_ctl, self._loadgen:
+ with self._vnf_ctl, self._collector:
+ if not self._vswitch_none:
+ self._add_flows()
- # run traffic generator if requested, otherwise wait for manual termination
- if S.getValue('mode') == 'trafficgen-off':
- time.sleep(2)
- self._logger.debug("All is set. Please run traffic generator manually.")
- input(os.linesep + "Press Enter to terminate vswitchperf..." + os.linesep + os.linesep)
- else:
- if S.getValue('mode') == 'trafficgen-pause':
- time.sleep(2)
- true_vals = ('yes', 'y', 'ye', None)
- while True:
- choice = input(os.linesep + 'Transmission paused, should'
- ' transmission be resumed? ' + os.linesep).lower()
- if not choice or choice not in true_vals:
- print('Please respond with \'yes\' or \'y\' ', end='')
- else:
- break
with self._traffic_ctl:
- self._traffic_ctl.send_traffic(self._traffic)
+ # execute test based on TestSteps definition if needed...
+ if self.step_run():
+ # ...and continue with traffic generation, but keep
+ # in mind, that clean deployment does not configure
+ # OVS nor executes the traffic
+ if self.deployment != 'clean':
+ self._traffic_ctl.send_traffic(self._traffic)
- # dump vswitch flows before they are affected by VNF termination
- if not self._vswitch_none:
- self._vswitch_ctl.dump_vswitch_flows()
+ # dump vswitch flows before they are affected by VNF termination
+ if not self._vswitch_none:
+ self._vswitch_ctl.dump_vswitch_flows()
+
+ # garbage collection for case that TestSteps modify existing deployment
+ self.step_stop_vnfs()
- # tear down test execution environment and log results
- self.run_finalize()
+ finally:
+ # tear down test execution environment and log results
+ self.run_finalize()
self._testcase_run_time = time.strftime("%H:%M:%S",
- time.gmtime(time.time() - self._testcase_start_time))
+ time.gmtime(time.time() -
+ self._testcase_start_time))
logging.info("Testcase execution time: " + self._testcase_run_time)
# report test results
self.run_report()
@@ -334,7 +364,7 @@ class TestCase(object):
"""
orig_value = S.getValue(param)
if orig_value != value:
- self._settings_original[param] = orig_value
+ self._settings_original[param] = copy.deepcopy(orig_value)
S.setValue(param, value)
def _append_results(self, results):
@@ -635,3 +665,143 @@ class TestCase(object):
vswitch.add_flow(bridge, flow)
else:
pass
+
+
+ #
+ # TestSteps realted methods
+ #
+ def step_report_status(self, label, status):
+ """ Log status of test step
+ """
+ self._logger.info("%s ... %s", label, 'OK' if status else 'FAILED')
+
+ def step_stop_vnfs(self):
+ """ Stop all VNFs started by TestSteps
+ """
+ for vnf in self._step_vnf_list:
+ self._step_vnf_list[vnf].stop()
+
+ @staticmethod
+ def step_eval_param(param, STEP):
+ # pylint: disable=invalid-name
+ """ Helper function for #STEP macro evaluation
+ """
+ if isinstance(param, str):
+ # evaluate every #STEP reference inside parameter itself
+ macros = re.findall(r'#STEP\[[\w\[\]\-\'\"]+\]', param)
+ if macros:
+ for macro in macros:
+ # pylint: disable=eval-used
+ tmp_val = str(eval(macro[1:]))
+ param = param.replace(macro, tmp_val)
+ return param
+ elif isinstance(param, list) or isinstance(param, tuple):
+ tmp_list = []
+ for item in param:
+ tmp_list.append(TestCase.step_eval_param(item, STEP))
+ return tmp_list
+ elif isinstance(param, dict):
+ tmp_dict = {}
+ for (key, value) in param.items():
+ tmp_dict[key] = TestCase.step_eval_param(value, STEP)
+ return tmp_dict
+ else:
+ return param
+
+ @staticmethod
+ def step_eval_params(params, step_result):
+ """ Evaluates referrences to results from previous steps
+ """
+ eval_params = []
+ # evaluate all parameters if needed
+ for param in params:
+ eval_params.append(TestCase.step_eval_param(param, step_result))
+ return eval_params
+
+ def step_run(self):
+ """ Execute actions specified by TestSteps list
+
+ :return: False if any error was detected
+ True otherwise
+ """
+ # anything to do?
+ if not self.test:
+ return True
+
+ # required for VNFs initialization
+ loader = Loader()
+ # initialize list with results
+ self._step_result = [None] * len(self.test)
+
+ # run test step by step...
+ for i, step in enumerate(self.test):
+ step_ok = not self._step_check
+ if step[0] == 'vswitch':
+ test_object = self._vswitch_ctl.get_vswitch()
+ elif step[0] == 'namespace':
+ test_object = namespace
+ elif step[0] == 'veth':
+ test_object = veth
+ elif step[0] == 'settings':
+ test_object = S
+ elif step[0] == 'tools':
+ test_object = TestStepsTools()
+ step[1] = step[1].title()
+ elif step[0] == 'trafficgen':
+ test_object = self._traffic_ctl
+ # in case of send_traffic or send_traffic_async methods, ensure
+ # that specified traffic values are merged with existing self._traffic
+ if step[1].startswith('send_traffic'):
+ tmp_traffic = copy.deepcopy(self._traffic)
+ tmp_traffic.update(step[2])
+ step[2] = tmp_traffic
+ elif step[0].startswith('vnf'):
+ if not self._step_vnf_list[step[0]]:
+ # initialize new VM
+ self._step_vnf_list[step[0]] = loader.get_vnf_class()()
+ test_object = self._step_vnf_list[step[0]]
+ elif step[0] == 'wait':
+ input(os.linesep + "Step {}: Press Enter to continue with "
+ "the next step...".format(i) + os.linesep + os.linesep)
+ continue
+ else:
+ self._logger.error("Unsupported test object %s", step[0])
+ self._step_status = {'status' : False, 'details' : ' '.join(step)}
+ self.step_report_status("Step '{}'".format(' '.join(step)),
+ self._step_status['status'])
+ return False
+
+ test_method = getattr(test_object, step[1])
+ if self._step_check:
+ test_method_check = getattr(test_object, CHECK_PREFIX + step[1])
+ else:
+ test_method_check = None
+
+ step_params = []
+ try:
+ # eval parameters, but use only valid step_results
+ # to support negative indexes
+ step_params = TestCase.step_eval_params(step[2:], self._step_result[:i])
+ step_log = '{} {}'.format(' '.join(step[:2]), step_params)
+ self._logger.debug("Step %s '%s' start", i, step_log)
+ self._step_result[i] = test_method(*step_params)
+ self._logger.debug("Step %s '%s' results '%s'", i,
+ step_log, self._step_result[i])
+ time.sleep(5)
+ if self._step_check:
+ step_ok = test_method_check(self._step_result[i], *step_params)
+ except (AssertionError, AttributeError, IndexError) as ex:
+ step_ok = False
+ self._logger.error("Step %s raised %s", i, type(ex).__name__)
+
+ if self._step_check:
+ self.step_report_status("Step {} - '{}'".format(i, step_log), step_ok)
+
+ if not step_ok:
+ self._step_status = {'status' : False, 'details' : step_log}
+ # Stop all VNFs started by TestSteps
+ self.step_stop_vnfs()
+ return False
+
+ # all steps processed without any issue
+ return True