diff options
Diffstat (limited to 'testcases/testcase.py')
-rw-r--r-- | testcases/testcase.py | 260 |
1 files changed, 215 insertions, 45 deletions
diff --git a/testcases/testcase.py b/testcases/testcase.py index 7f22c18f..18ad4240 100644 --- a/testcases/testcase.py +++ b/testcases/testcase.py @@ -32,8 +32,12 @@ from core.results.results_constants import ResultsConstants from tools import tasks from tools import hugepages from tools import functions +from tools import namespace +from tools import veth +from tools.teststepstools import TestStepsTools from tools.pkt_gen.trafficgen.trafficgenhelper import TRAFFIC_DEFAULTS +CHECK_PREFIX = 'validate_' class TestCase(object): """TestCase base class @@ -60,6 +64,12 @@ class TestCase(object): self._settings_original = {} self._settings_paths_modified = False self._testcast_run_time = None + # initialization of step driven specific members + self._step_check = False # by default don't check result for step driven testcases + self._step_vnf_list = {} + self._step_result = [] + self._step_status = None + self._testcase_run_time = None # store all GUEST_ specific settings to keep original values before their expansion for key in S.__dict__: @@ -69,14 +79,21 @@ class TestCase(object): self._update_settings('VSWITCH', cfg.get('vSwitch', S.getValue('VSWITCH'))) self._update_settings('VNF', cfg.get('VNF', S.getValue('VNF'))) self._update_settings('TRAFFICGEN', cfg.get('Trafficgen', S.getValue('TRAFFICGEN'))) - self._update_settings('TEST_PARAMS', cfg.get('Parameters', S.getValue('TEST_PARAMS'))) + test_params = copy.deepcopy(S.getValue('TEST_PARAMS')) + tc_test_params = cfg.get('Parameters', S.getValue('TEST_PARAMS')) + test_params.update(tc_test_params) + self._update_settings('TEST_PARAMS', test_params) + S.check_test_params() + + # override all redefined GUEST_ values to have them expanded correctly + tmp_test_params = copy.deepcopy(S.getValue('TEST_PARAMS')) + for key in tmp_test_params: + if key.startswith('GUEST_'): + S.setValue(key, S.getValue(key)) + S.getValue('TEST_PARAMS').pop(key) # update global settings functions.settings_update_paths() - guest_loopback = get_test_param('guest_loopback', None) - if guest_loopback: - # we can put just one item, it'll be expanded automatically for all VMs - self._update_settings('GUEST_LOOPBACK', [guest_loopback]) # set test parameters; CLI options take precedence to testcase settings self._logger = logging.getLogger(__name__) @@ -162,7 +179,9 @@ class TestCase(object): self._traffic['l2'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L2') self._traffic['l3'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L3') self._traffic['l4'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L4') - elif S.getValue('NICS')[0]['type'] == 'vf' or S.getValue('NICS')[1]['type'] == 'vf': + elif len(S.getValue('NICS')) and \ + (S.getValue('NICS')[0]['type'] == 'vf' or + S.getValue('NICS')[1]['type'] == 'vf'): mac1 = S.getValue('NICS')[0]['mac'] mac2 = S.getValue('NICS')[1]['mac'] if mac1 and mac2: @@ -170,6 +189,12 @@ class TestCase(object): else: self._logger.debug("MAC addresses can not be read") + # count how many VNFs are involved in TestSteps + if self.test: + for step in self.test: + if step[0].startswith('vnf'): + self._step_vnf_list[step[0]] = None + def run_initialize(self): """ Prepare test execution environment """ @@ -186,26 +211,31 @@ class TestCase(object): self._vnf_ctl = component_factory.create_vnf( self.deployment, - loader.get_vnf_class()) + loader.get_vnf_class(), + len(self._step_vnf_list)) # verify enough hugepages are free to run the testcase if not self._check_for_enough_hugepages(): raise RuntimeError('Not enough hugepages free to run test.') # perform guest related handling - if self._vnf_ctl.get_vnfs_number(): + tmp_vm_count = self._vnf_ctl.get_vnfs_number() + len(self._step_vnf_list) + if tmp_vm_count: # copy sources of l2 forwarding tools into VM shared dir if needed - self._copy_fwd_tools_for_all_guests(self._vnf_ctl.get_vnfs_number()) + self._copy_fwd_tools_for_all_guests(tmp_vm_count) # in case of multi VM in parallel, set the number of streams to the number of VMs if self.deployment.startswith('pvpv'): # for each VM NIC pair we need an unique stream streams = 0 - for vm_nic in S.getValue('GUEST_NICS_NR')[:self._vnf_ctl.get_vnfs_number()]: + for vm_nic in S.getValue('GUEST_NICS_NR')[:tmp_vm_count]: streams += int(vm_nic / 2) if vm_nic > 1 else 1 self._logger.debug("VMs with parallel connection were detected. " "Thus Number of streams was set to %s", streams) - self._traffic.update({'multistream': streams}) + # update streams if needed; In case of additional VNFs deployed by TestSteps + # user can define a proper stream count manually + if 'multistream' not in self._traffic or self._traffic['multistream'] < streams: + self._traffic.update({'multistream': streams}) # OVS Vanilla requires guest VM MAC address and IPs to work if 'linux_bridge' in S.getValue('GUEST_LOOPBACK'): @@ -235,11 +265,16 @@ class TestCase(object): self._output_file = os.path.join(self._results_dir, "result_" + self.name + "_" + self.deployment + ".csv") + self._step_status = {'status' : True, 'details' : ''} + self._logger.debug("Setup:") def run_finalize(self): """ Tear down test execution environment and record test results """ + # Stop all VNFs started by TestSteps in case that something went wrong + self.step_stop_vnfs() + # umount hugepages if mounted self._umount_hugepages() @@ -248,22 +283,20 @@ class TestCase(object): # cleanup any namespaces created if os.path.isdir('/tmp/namespaces'): - import tools.namespace namespace_list = os.listdir('/tmp/namespaces') if len(namespace_list): self._logger.info('Cleaning up namespaces') for name in namespace_list: - tools.namespace.delete_namespace(name) + namespace.delete_namespace(name) os.rmdir('/tmp/namespaces') # cleanup any veth ports created if os.path.isdir('/tmp/veth'): - import tools.veth veth_list = os.listdir('/tmp/veth') if len(veth_list): self._logger.info('Cleaning up veth ports') for eth in veth_list: port1, port2 = eth.split('-') - tools.veth.del_veth_port(port1, port2) + veth.del_veth_port(port1, port2) os.rmdir('/tmp/veth') def run_report(self): @@ -272,11 +305,12 @@ class TestCase(object): self._logger.debug("self._collector Results:") self._collector.print_results() - if S.getValue('mode') != 'trafficgen-off': + results = self._traffic_ctl.get_results() + if results: self._logger.debug("Traffic Results:") self._traffic_ctl.print_results() - self._tc_results = self._append_results(self._traffic_ctl.get_results()) + self._tc_results = self._append_results(results) TestCase.write_result_to_file(self._tc_results, self._output_file) def run(self): @@ -287,39 +321,35 @@ class TestCase(object): # prepare test execution environment self.run_initialize() - with self._vswitch_ctl, self._loadgen: - with self._vnf_ctl, self._collector: - if not self._vswitch_none: - self._add_flows() + try: + with self._vswitch_ctl, self._loadgen: + with self._vnf_ctl, self._collector: + if not self._vswitch_none: + self._add_flows() - # run traffic generator if requested, otherwise wait for manual termination - if S.getValue('mode') == 'trafficgen-off': - time.sleep(2) - self._logger.debug("All is set. Please run traffic generator manually.") - input(os.linesep + "Press Enter to terminate vswitchperf..." + os.linesep + os.linesep) - else: - if S.getValue('mode') == 'trafficgen-pause': - time.sleep(2) - true_vals = ('yes', 'y', 'ye', None) - while True: - choice = input(os.linesep + 'Transmission paused, should' - ' transmission be resumed? ' + os.linesep).lower() - if not choice or choice not in true_vals: - print('Please respond with \'yes\' or \'y\' ', end='') - else: - break with self._traffic_ctl: - self._traffic_ctl.send_traffic(self._traffic) + # execute test based on TestSteps definition if needed... + if self.step_run(): + # ...and continue with traffic generation, but keep + # in mind, that clean deployment does not configure + # OVS nor executes the traffic + if self.deployment != 'clean': + self._traffic_ctl.send_traffic(self._traffic) - # dump vswitch flows before they are affected by VNF termination - if not self._vswitch_none: - self._vswitch_ctl.dump_vswitch_flows() + # dump vswitch flows before they are affected by VNF termination + if not self._vswitch_none: + self._vswitch_ctl.dump_vswitch_flows() + + # garbage collection for case that TestSteps modify existing deployment + self.step_stop_vnfs() - # tear down test execution environment and log results - self.run_finalize() + finally: + # tear down test execution environment and log results + self.run_finalize() self._testcase_run_time = time.strftime("%H:%M:%S", - time.gmtime(time.time() - self._testcase_start_time)) + time.gmtime(time.time() - + self._testcase_start_time)) logging.info("Testcase execution time: " + self._testcase_run_time) # report test results self.run_report() @@ -334,7 +364,7 @@ class TestCase(object): """ orig_value = S.getValue(param) if orig_value != value: - self._settings_original[param] = orig_value + self._settings_original[param] = copy.deepcopy(orig_value) S.setValue(param, value) def _append_results(self, results): @@ -635,3 +665,143 @@ class TestCase(object): vswitch.add_flow(bridge, flow) else: pass + + + # + # TestSteps realted methods + # + def step_report_status(self, label, status): + """ Log status of test step + """ + self._logger.info("%s ... %s", label, 'OK' if status else 'FAILED') + + def step_stop_vnfs(self): + """ Stop all VNFs started by TestSteps + """ + for vnf in self._step_vnf_list: + self._step_vnf_list[vnf].stop() + + @staticmethod + def step_eval_param(param, STEP): + # pylint: disable=invalid-name + """ Helper function for #STEP macro evaluation + """ + if isinstance(param, str): + # evaluate every #STEP reference inside parameter itself + macros = re.findall(r'#STEP\[[\w\[\]\-\'\"]+\]', param) + if macros: + for macro in macros: + # pylint: disable=eval-used + tmp_val = str(eval(macro[1:])) + param = param.replace(macro, tmp_val) + return param + elif isinstance(param, list) or isinstance(param, tuple): + tmp_list = [] + for item in param: + tmp_list.append(TestCase.step_eval_param(item, STEP)) + return tmp_list + elif isinstance(param, dict): + tmp_dict = {} + for (key, value) in param.items(): + tmp_dict[key] = TestCase.step_eval_param(value, STEP) + return tmp_dict + else: + return param + + @staticmethod + def step_eval_params(params, step_result): + """ Evaluates referrences to results from previous steps + """ + eval_params = [] + # evaluate all parameters if needed + for param in params: + eval_params.append(TestCase.step_eval_param(param, step_result)) + return eval_params + + def step_run(self): + """ Execute actions specified by TestSteps list + + :return: False if any error was detected + True otherwise + """ + # anything to do? + if not self.test: + return True + + # required for VNFs initialization + loader = Loader() + # initialize list with results + self._step_result = [None] * len(self.test) + + # run test step by step... + for i, step in enumerate(self.test): + step_ok = not self._step_check + if step[0] == 'vswitch': + test_object = self._vswitch_ctl.get_vswitch() + elif step[0] == 'namespace': + test_object = namespace + elif step[0] == 'veth': + test_object = veth + elif step[0] == 'settings': + test_object = S + elif step[0] == 'tools': + test_object = TestStepsTools() + step[1] = step[1].title() + elif step[0] == 'trafficgen': + test_object = self._traffic_ctl + # in case of send_traffic or send_traffic_async methods, ensure + # that specified traffic values are merged with existing self._traffic + if step[1].startswith('send_traffic'): + tmp_traffic = copy.deepcopy(self._traffic) + tmp_traffic.update(step[2]) + step[2] = tmp_traffic + elif step[0].startswith('vnf'): + if not self._step_vnf_list[step[0]]: + # initialize new VM + self._step_vnf_list[step[0]] = loader.get_vnf_class()() + test_object = self._step_vnf_list[step[0]] + elif step[0] == 'wait': + input(os.linesep + "Step {}: Press Enter to continue with " + "the next step...".format(i) + os.linesep + os.linesep) + continue + else: + self._logger.error("Unsupported test object %s", step[0]) + self._step_status = {'status' : False, 'details' : ' '.join(step)} + self.step_report_status("Step '{}'".format(' '.join(step)), + self._step_status['status']) + return False + + test_method = getattr(test_object, step[1]) + if self._step_check: + test_method_check = getattr(test_object, CHECK_PREFIX + step[1]) + else: + test_method_check = None + + step_params = [] + try: + # eval parameters, but use only valid step_results + # to support negative indexes + step_params = TestCase.step_eval_params(step[2:], self._step_result[:i]) + step_log = '{} {}'.format(' '.join(step[:2]), step_params) + self._logger.debug("Step %s '%s' start", i, step_log) + self._step_result[i] = test_method(*step_params) + self._logger.debug("Step %s '%s' results '%s'", i, + step_log, self._step_result[i]) + time.sleep(5) + if self._step_check: + step_ok = test_method_check(self._step_result[i], *step_params) + except (AssertionError, AttributeError, IndexError) as ex: + step_ok = False + self._logger.error("Step %s raised %s", i, type(ex).__name__) + + if self._step_check: + self.step_report_status("Step {} - '{}'".format(i, step_log), step_ok) + + if not step_ok: + self._step_status = {'status' : False, 'details' : step_log} + # Stop all VNFs started by TestSteps + self.step_stop_vnfs() + return False + + # all steps processed without any issue + return True |