summaryrefslogtreecommitdiffstats
path: root/moonclient/moonclient/tests.py
blob: 6484009ff1e909a48364c2adea499add4e958714 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
# This software is distributed under the terms and conditions of the 'Apache-2.0'
# license which can be found in the file 'LICENSE' in this package distribution
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.

import logging
import json
import shlex
import re
from cliff.lister import Lister
from cliff.command import Command
from uuid import uuid4
import os
import time
import subprocess


class TestsLaunch(Lister):
    """Tests launcher."""

    log = logging.getLogger(__name__)
    result_vars = dict()
    logfile_name = "/tmp/moonclient_test_{}.log".format(time.strftime("%Y%m%d-%H%M%S"))
    logfile = open(logfile_name, "w")

    def get_parser(self, prog_name):
        parser = super(TestsLaunch, self).get_parser(prog_name)
        parser.add_argument(
            'testfile',
            metavar='<filename>',
            help='Filename that contains tests to run',
        )
        return parser

    def __replace_var_in_str(self, data_str):
        self.log.debug("__replace_var_in_str " + data_str)
        for exp in re.findall("\$\w+", data_str):
            self.log.debug("--->" + exp + str(self.result_vars))
            if exp.replace("$", "") in self.result_vars:
                data_str = re.sub(exp.replace("$", "\$") + "(?!\w)", self.result_vars[exp.replace("$", "")], data_str)
        self.log.debug("__replace_var_in_str " + data_str)
        return data_str

    def __compare_results(self, expected, observed):
        match = re.search(expected, observed)
        if match:
            self.result_vars.update(match.groupdict())
            return True
        return False

    def take_action(self, parsed_args):
        self.log.info("Write tests output to {}".format(self.logfile_name))
        stdout_back = self.app.stdout
        if not parsed_args.testfile:
            self.log.error("You don't give a test filename.")
            raise Exception("Cannot execute tests.")
        tests_dict = json.load(open(parsed_args.testfile))
        self.log.debug("tests_dict = {}".format(tests_dict))
        global_command_options = ""
        if "command_options" in tests_dict:
            global_command_options = tests_dict["command_options"]
        data = list()
        for group_name, tests_list in tests_dict["tests_group"].iteritems():
            overall_result = True
            self.log.info("\n\033[1mgroup {}\033[0m".format(group_name))
            self.logfile.write("{}:\n\n".format(group_name))
            test_count = len(tests_list)
            for test in tests_list:
                result_str = ""
                error_str = ""
                if "auth_name" in test or "auth_password" in test or "auth_url" in test:
                    username = None
                    password = None
                    host = None
                    port = None
                    description = ""
                    if "auth_name" in test:
                        username = test["auth_name"]
                    if "auth_password" in test:
                        password = test["auth_password"]
                    if "auth_host" in test:
                        host = test["auth_host"]
                    if "auth_port" in test:
                        port = test["auth_port"]
                    if "description" in test:
                        description = test["description"]
                    self.app.auth_keystone(username, password, host, port)
                    title = "Change auth to "
                    if username:
                        title += username
                    if host:
                        title += "@" + host
                    if port:
                        title += ":" + port
                    title += "\n"
                    self.logfile.write(title + "\n")
                    self.log.info(title)
                    data_tmp = list()
                    data_tmp.append("")
                    data_tmp.append(title.strip())
                    data_tmp.append("\033[32mOK\033[m")
                    data_tmp.append(description.strip())
                    data.append(data_tmp)
                    continue
                data_tmp = list()
                tmp_filename = os.path.join("/tmp", uuid4().hex)
                tmp_filename_fd = open(tmp_filename, "w")
                self.log.debug("test={}".format(test))
                if "command" not in test:
                    if "external_command" in test:
                        ext_command = test["external_command"]
                    else:
                        ext_command = test["shell_command"]
                    ext_command = self.__replace_var_in_str(ext_command)
                    self.logfile.write("-----> {}\n".format(ext_command))
                    self.log.info("    \\-executing external \"{}\"".format(ext_command))
                    if "external_command" in test:
                        pipe = subprocess.Popen(shlex.split(ext_command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                    else:
                        # Note (asteroide): security hazard! Must reduce the possible commands here.
                        pipe = subprocess.Popen(ext_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                    com = pipe.communicate()
                    result_str = com[0]
                    error_str = com[1]
                    self.logfile.write("stdout: {}\n".format(result_str))
                    self.logfile.write("stderr: {}\n".format(error_str))
                if "command" in test:
                    if "command_options" in test:
                        command = test["command"] + " " + test["command_options"]
                    else:
                        command = test["command"] + " " + global_command_options
                    command = self.__replace_var_in_str(command)
                    self.logfile.write("-----> {}\n".format(command))
                    self.log.info("    \\-executing {}".format(command))
                    self.app.stdout = tmp_filename_fd
                    result_id = self.app.run_subcommand(shlex.split(command))
                    tmp_filename_fd.close()
                    self.app.stdout = stdout_back
                    result_str = open(tmp_filename, "r").read()
                    self.logfile.write("{}".format(result_str))
                data_tmp.append(group_name)
                data_tmp.append(test["name"])
                compare = self.__compare_results(self.__replace_var_in_str(test["result"]), result_str)
                self.logfile.write("\\---->{}: {}\n\n".format(compare, self.__replace_var_in_str(test["result"])))
                if error_str:
                    if compare:
                        compare = "\033[33mTrue\033[m"
                        overall_result = overall_result and True
                    else:
                        compare = "\033[1m\033[31mFalse\033[m"
                        overall_result = overall_result and False
                else:
                    overall_result = overall_result and compare
                    if compare:
                        if overall_result:
                            compare = "\033[32mTrue\033[m"
                        else:
                            compare = "\033[mTrue\033[m"
                    else:
                        compare = "\033[1m\033[31mFalse\033[m"
                data_tmp.append(compare)
                data_tmp.append(test["description"])
                data.append(data_tmp)
            data_tmp = list()
            data_tmp.append("\033[1m" + group_name + "\033[m")
            data_tmp.append("\033[1mOverall results ({})\033[m".format(test_count))
            if overall_result:
                data_tmp.append("\033[1m\033[32mTrue\033[m")
            else:
                data_tmp.append("\033[1m\033[31mFalse\033[m")
            data_tmp.append(self.logfile_name)
            data.append(data_tmp)

        return (
            ("group_name", "test_name", "result", "description"),
            data
        )