aboutsummaryrefslogtreecommitdiffstats
path: root/xtesting/core/mts.py
blob: a3cbab9d60d7b242ddbeb2943a27926bbe0b1e4f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
#!/usr/bin/env python

# Copyright (c) 2020 Orange and others.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0

# pylint: disable=too-many-instance-attributes

"""Define the parent classes of all Xtesting Features.

Feature is considered as TestCase offered by Third-party. It offers
helpers to run any python method or any bash command.
"""

import csv
import logging
import os
import shutil
import subprocess
import sys
import time

from lxml import etree
import prettytable
import six

from xtesting.core import testcase


__author__ = ("Vincent Mahe <v.mahe@orange.com>, "
              "Cedric Ollivier <cedric.ollivier@orange.com>")


class MTSLauncher(testcase.TestCase):
    """Class designed to run MTS tests."""

    __logger = logging.getLogger(__name__)
    mts_install_dir = "/opt/mts"

    def check_requirements(self):
        """Check if startCmd.sh is in /opt/mts/bin"""
        if not os.path.exists(
                os.path.join(self.mts_install_dir, 'bin/startCmd.sh')):
            self.__logger.warning(
                "mts is not available for arm for the time being")
            self.is_skipped = True

    def __init__(self, **kwargs):
        super(MTSLauncher, self).__init__(**kwargs)
        self.result_file = "{}/{}.log".format(self.res_dir, self.case_name)
        # Location of the HTML report generated by MTS
        self.mts_stats_dir = os.path.join(self.res_dir, 'mts_stats_report')
        # Location of the log files generated by MTS for each test.
        # Need to end path with a separator because of a bug in MTS.
        self.mts_logs_dir = os.path.join(self.res_dir,
                                         'mts_logs' + os.path.sep)
        # The location of file named testPlan.csv
        # that it always in $MTS_HOME/logs
        self.mts_result_csv_file = self.mts_install_dir + os.path.sep
        self.mts_result_csv_file += ("logs" + os.path.sep + "testPlan.csv")
        self.total_tests = 0
        self.pass_tests = 0
        self.fail_tests = 0
        self.skip_tests = 0
        self.response = None
        self.testcases = []

    def parse_results(self):
        """Parse testPlan.csv containing the status of each testcase of the test file.
        See sample file in `xtesting/samples/mts/output/testPlan.csv`
        """
        with open(self.mts_result_csv_file) as stream_:
            self.__logger.info("Parsing file : %s", self.mts_result_csv_file)
            reader = csv.reader(stream_, delimiter=';')
            rownum = 0
            _tests_data = []
            msg = prettytable.PrettyTable(
                header_style='upper', padding_width=5,
                field_names=['MTS test', 'MTS test case',
                             'status'])
            for row in reader:
                _test_dict = {}
                nb_values = len(row)
                if rownum > 0:
                    # If there's only one delimiter,
                    # it is the name of the <test> elt
                    if nb_values == 2:
                        test_name = row[0]
                        _test_dict['parent'] = test_name
                    elif nb_values == 3:
                        testcase_name = row[0].lstrip()
                        testcase_status = row[2]
                        self.total_tests += 1
                        if testcase_status == 'OK':
                            self.pass_tests += 1
                        elif testcase_status == 'Failed':
                            self.fail_tests += 1
                        elif testcase_status == '?':
                            self.skip_tests += 1
                        _test_dict['status'] = testcase_status
                        _test_dict['name'] = testcase_name
                        msg.add_row(
                            [test_name,
                             _test_dict['name'],
                             _test_dict['status']])
                rownum += 1
                _tests_data.append(_test_dict)
            try:
                self.result = 100 * (
                    self.pass_tests / self.total_tests)
            except ZeroDivisionError:
                self.__logger.error("No test has been run")
            self.__logger.info("MTS Test result:\n\n%s\n", msg.get_string())
            self.details = {}
            self.details['description'] = "Execution of some MTS tests"
            self.details['total_tests'] = self.total_tests
            self.details['pass_tests'] = self.pass_tests
            self.details['fail_tests'] = self.fail_tests
            self.details['skip_tests'] = self.skip_tests
            self.details['tests'] = _tests_data

    def parse_xml_test_file(self, xml_test_file):
        """Parse the XML file containing the test definition for MTS.
        See sample file in `xtesting/samples/mts/test.xml`
        """
        nb_testcases = -1
        self.__logger.info(
            "Parsing XML test file %s containing the MTS tests definitions.",
            xml_test_file)
        try:
            parser = etree.XMLParser(load_dtd=True, resolve_entities=True)
            self.__logger.info("XML test file %s successfully parsed.",
                               xml_test_file)
            root = etree.parse(xml_test_file, parser=parser)
            # Need to look at all child nodes because there may be
            # some <for> elt between <test> and <testcase> elt
            self.testcases = root.xpath('//test//testcase/@name')
            nb_testcases = len(self.testcases)
            if nb_testcases == 0:
                self.__logger.warning("Found no MTS testcase !")
            elif nb_testcases == 1:
                self.__logger.info("Found only one MTS testcase: %s",
                                   self.testcases[0])
            else:
                self.__logger.info("Found %d MTS testcases :", nb_testcases)
                for mts_testcase in self.testcases:
                    self.__logger.info("    - %s", mts_testcase)
        except etree.XMLSyntaxError as xml_err:
            self.__logger.error("Error while parsing XML test file: %s",
                                str(xml_err))
        return nb_testcases

    def check_enabled_mts_test_cases(self, enabled_testcases):
        """Make sure that all required MTS test cases exist
        in the XML test file.
        """
        if enabled_testcases:
            # Verify if the MTS test case exists in the whole list of test
            # cases declared in the test XML file
            for enabled_testcase in enabled_testcases:
                if enabled_testcase not in self.testcases:
                    self.__logger.error(
                        "The required MTS testcase named `%s` does not exist"
                        " !", enabled_testcase)
                    return False
        return True

    def execute(self, **kwargs):  # pylint: disable=too-many-locals
        """Execute the cmd passed as arg

        Args:
            kwargs: Arbitrary keyword arguments.

        Returns:
            0 if cmd returns 0,
            -1 otherwise.
        """
        try:
            console = kwargs["console"] if "console" in kwargs else False
            # Read specific parameters for MTS
            test_file = kwargs["test_file"]
            log_level = kwargs[
                "log_level"] if "log_level" in kwargs else "INFO"

            # For some MTS tests, we need to force stop after N sec
            max_duration = kwargs[
                "max_duration"] if "max_duration" in kwargs else None
            store_method = kwargs[
                "store_method"] if "store_method" in kwargs else "FILE"
            # Must use the $HOME_MTS/bin as current working dir
            cwd = self.mts_install_dir + os.path.sep + "bin"

            # Get the list of enabled MTS testcases, if any
            enabled_testcases = kwargs[
                "testcases"] if "testcases" in kwargs else []
            enabled_testcases_str = ''
            if enabled_testcases:
                enabled_testcases_str = ' '.join(enabled_testcases)
                check_ok = self.check_enabled_mts_test_cases(enabled_testcases)
                if not check_ok:
                    return -2

            # Build command line to launch for MTS
            cmd = ("cd {} && ./startCmd.sh {} {} -sequential -levelLog:{}"
                   " -storageLog:{}"
                   " -config:stats.REPORT_DIRECTORY+{}"
                   " -config:logs.STORAGE_DIRECTORY+{}"
                   " -genReport:true"
                   " -showRep:false").format(cwd,
                                             test_file,
                                             enabled_testcases_str,
                                             log_level,
                                             store_method,
                                             self.mts_stats_dir,
                                             self.mts_logs_dir)

            # Make sure to create the necessary output sub-folders for MTS
            # and cleanup output files from previous run.
            if os.path.exists(self.mts_result_csv_file):
                os.remove(self.mts_result_csv_file)

            if os.path.isdir(self.mts_stats_dir):
                shutil.rmtree(self.mts_stats_dir)
            os.makedirs(self.mts_stats_dir)

            if os.path.isdir(self.mts_logs_dir):
                shutil.rmtree(self.mts_logs_dir)
            os.makedirs(self.mts_logs_dir)

            self.__logger.info(
                "MTS statistics output dir: %s ", self.mts_stats_dir)
            self.__logger.info("MTS logs output dir: %s ", self.mts_logs_dir)

            # Launch MTS as a sub-process
            # and save its standard output to a file
            with open(self.result_file, 'w') as f_stdout:
                self.__logger.info("Calling %s", cmd)
                process = subprocess.Popen(
                    cmd, shell=True, stdout=subprocess.PIPE,
                    stderr=subprocess.STDOUT)
                for line in iter(process.stdout.readline, b''):
                    if console:
                        sys.stdout.write(line.decode("utf-8"))
                    f_stdout.write(line.decode("utf-8"))
                if six.PY3:
                    try:
                        # pylint: disable=unexpected-keyword-arg
                        process.wait(timeout=max_duration)
                    # pylint: disable=no-member
                    except subprocess.TimeoutExpired:
                        process.kill()
                        self.__logger.info(
                            "Killing MTS process after %d second(s).",
                            max_duration)
                        return 3
                else:
                    process.wait()
            with open(self.result_file, 'r') as f_stdin:
                self.__logger.debug("$ %s\n%s", cmd, f_stdin.read().rstrip())
            return process.returncode
        except KeyError:
            self.__logger.error("Missing mandatory arg for MTS. kwargs: %s",
                                kwargs)
        return -1

    def run(self, **kwargs):
        """Run the feature.

        It allows executing any Python method by calling execute().

        It sets the following attributes required to push the results
        to DB:

            * result,
            * start_time,
            * stop_time.

        It doesn't fulfill details when pushing the results to the DB.

        Args:
            kwargs: Arbitrary keyword arguments.

        Returns:
            TestCase.EX_OK if execute() returns 0,
            TestCase.EX_RUN_ERROR otherwise.
        """
        self.start_time = time.time()
        exit_code = testcase.TestCase.EX_RUN_ERROR
        self.result = 0
        try:
            nb_testcases = self.parse_xml_test_file(kwargs["test_file"])
            # Do something only if there are some MTS test cases in the test
            # file
            if nb_testcases > 0:
                if self.execute(**kwargs) == 0:
                    exit_code = testcase.TestCase.EX_OK
                    try:
                        self.parse_results()
                    except Exception:  # pylint: disable=broad-except
                        self.__logger.exception(
                            "Cannot parse result file "
                            "$MTS_HOME/logs/testPlan.csv")
                        exit_code = testcase.TestCase.EX_RUN_ERROR
        except Exception:  # pylint: disable=broad-except
            self.__logger.exception("%s FAILED", self.project_name)
        self.stop_time = time.time()
        return exit_code