summaryrefslogtreecommitdiffstats
path: root/moonclient/moonclient/tests.py
blob: dd0dd7dec16a0bfd8c26c1bfd09c808b9a2d7a32 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
# This software is distributed under the terms and conditions of the 'Apache-2.0'
# license which can be found in the file 'LICENSE' in this package distribution
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.

import logging
import json
import shlex
import re
from cliff.lister import Lister
from cliff.command import Command
from uuid import uuid4
import os
import time
import subprocess


class TestsLaunch(Lister):
    """Tests launcher."""

    log = logging.getLogger(__name__)
    result_vars = dict()
    logfile_name = "/tmp/moonclient_test_{}.log".format(time.strftime("%Y%m%d-%H%M%S"))
    logfile = open(logfile_name, "w")

    def get_parser(self, prog_name):
        parser = super(TestsLaunch, self).get_parser(prog_name)
        parser.add_argument(
            'testfile',
            metavar='<filename>',
            help='Filename that contains tests to run',
        )
        return parser

    def __replace_var_in_str(self, data_str):
        self.log.debug("__replace_var_in_str " + data_str)
        for exp in re.findall("\$\w+", data_str):
            self.log.debug("--->" + exp + str(self.result_vars))
            if exp.replace("$", "") in self.result_vars:
                data_str = re.sub(exp.replace("$", "\$") + "(?!\w)", self.result_vars[exp.replace("$", "")], data_str)
        self.log.debug("__replace_var_in_str " + data_str)
        return data_str

    def __compare_results(self, expected, observed):
        match = re.search(expected, observed)
        if match:
            self.result_vars.update(match.groupdict())
            return True
        return False

    def take_action(self, parsed_args):
        self.log.info("Write tests output to {}".format(self.logfile_name))
        stdout_back = self.app.stdout
        if not parsed_args.testfile:
            self.log.error("You don't give a test filename.")
            raise Exception("Cannot execute tests.")
        tests_dict = json.load(open(parsed_args.testfile))
        self.log.debug("tests_dict = {}".format(tests_dict))
        global_command_options = ""
        if "command_options" in tests_dict:
            global_command_options = tests_dict["command_options"]
        data = list()
        for group_name, tests_list in tests_dict["tests_group"].iteritems():
            overall_result = True
            self.log.info("\n\033[1mgroup {}\033[0m".format(group_name))
            self.logfile.write("{}:\n\n".format(group_name))
            test_count = len(tests_list)
            for test in tests_list:
                result_str = ""
                error_str = ""
                data_tmp = list()
                tmp_filename = os.path.join("/tmp", uuid4().hex)
                tmp_filename_fd = open(tmp_filename, "w")
                self.log.debug("test={}".format(test))
                if "command" not in test:
                    ext_command = test["external_command"]
                    ext_command = self.__replace_var_in_str(ext_command)
                    self.logfile.write("-----> {}\n".format(ext_command))
                    self.log.info("    \\-executing external \"{}\"".format(ext_command))
                    pipe = subprocess.Popen(shlex.split(ext_command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                    com = pipe.communicate()
                    result_str = com[0]
                    error_str = com[1]
                    self.logfile.write("stdout: {}\n".format(result_str))
                    self.logfile.write("stderr: {}\n".format(error_str))
                if "command" in test:
                    if "command_options" in test:
                        command = test["command"] + " " + test["command_options"]
                    else:
                        command = test["command"] + " " + global_command_options
                    command = self.__replace_var_in_str(command)
                    self.logfile.write("-----> {}\n".format(command))
                    self.log.info("    \\-executing {}".format(command))
                    self.app.stdout = tmp_filename_fd
                    result_id = self.app.run_subcommand(shlex.split(command))
                    tmp_filename_fd.close()
                    self.app.stdout = stdout_back
                    result_str = open(tmp_filename, "r").read()
                    self.logfile.write("{}".format(result_str))
                data_tmp.append(group_name)
                data_tmp.append(test["name"])
                compare = self.__compare_results(self.__replace_var_in_str(test["result"]), result_str)
                self.logfile.write("----->{} ({})\n\n".format(compare, self.__replace_var_in_str(test["result"])))
                if error_str:
                    compare = "\033[1m\033[31mFalse\033[m"
                    overall_result = False
                else:
                    overall_result = overall_result and compare
                    if compare:
                        compare = "\033[32mTrue\033[m"
                    else:
                        compare = "\033[1m\033[31mFalse\033[m"
                data_tmp.append(compare)
                data_tmp.append(test["description"])
                data.append(data_tmp)
            data_tmp = list()
            data_tmp.append("\033[1m" + group_name + "\033[m")
            data_tmp.append("\033[1mOverall results ({})\033[m".format(test_count))
            if overall_result:
                data_tmp.append("\033[1m\033[32mTrue\033[m")
            else:
                data_tmp.append("\033[1m\033[31mFalse\033[m")
            data_tmp.append(self.logfile_name)
            data.append(data_tmp)

        return (
            ("group_name", "test_name", "result", "description"),
            data
        )