summaryrefslogtreecommitdiffstats
path: root/moonclient/moonclient/tests.py
blob: a97ba61c544280b021f7ce212d47e3a22140395b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
# This software is distributed under the terms and conditions of the 'Apache-2.0'
# license which can be found in the file 'LICENSE' in this package distribution
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.

import logging
import json
import shlex
import re
from cliff.lister import Lister
from cliff.command import Command
from uuid import uuid4
import os
import time
import subprocess


class TestsLaunch(Lister):
    """Tests launcher."""

    log = logging.getLogger(__name__)
    result_vars = dict()
    logfile_name = "/tmp/moonclient_test_{}.log".format(time.strftime("%Y%m%d-%H%M%S"))
    logfile = open(logfile_name, "w")

    def get_parser(self, prog_name):
        parser = super(TestsLaunch, self).get_parser(prog_name)
        parser.add_argument(
            'testfile',
            metavar='<filename>',
            help='Filename that contains tests to run',
        )
        return parser

    def __replace_var_in_str(self, data_str):
        self.log.debug("__replace_var_in_str " + data_str)
        for exp in re.findall("\$\w+", data_str):
            self.log.debug("--->" + exp + str(self.result_vars))
            if exp.replace("$", "") in self.result_vars:
                data_str = re.sub(exp.replace("$", "\$") + "(?!\w)", self.result_vars[exp.replace("$", "")], data_str)
        self.log.debug("__replace_var_in_str " + data_str)
        return data_str

    def __compare_results(self, expected, observed):
        match = re.search(expected, observed)
        if match:
            self.result_vars.update(match.groupdict())
            return True
        return False

    def take_action(self, parsed_args):
        self.log.info("Write tests output to {}".format(self.logfile_name))
        stdout_back = self.app.stdout
        if not parsed_args.testfile:
            self.log.error("You don't give a test filename.")
            raise Exception("Cannot execute tests.")
        tests_dict = json.load(open(parsed_args.testfile))
        self.log.debug("tests_dict = {}".format(tests_dict))
        global_command_options = ""
        if "command_options" in tests_dict:
            global_command_options = tests_dict["command_options"]
        data = list()
        for group_name, tests_list in tests_dict["tests_group"].iteritems():
            overall_result = True
            self.log.info("\n\033[1mgroup {}\033[0m".format(group_name))
            self.logfile.write("{}:\n\n".format(group_name))
            test_count = len(tests_list)
            for test in tests_list:
                result_str = ""
                error_str = ""
                data_tmp = list()
                tmp_filename = os.path.join("/tmp", uuid4().hex)
                tmp_filename_fd = open(tmp_filename, "w")
                self.log.debug("test={}".format(test))
                if "command" not in test:
                    ext_command = test["external_command"]
                    ext_command = self.__replace_var_in_str(ext_command)
                    self.logfile.write("-----> {}\n".format(ext_command))
                    self.log.info("    \\-executing external \"{}\"".format(ext_command))
                    pipe = subprocess.Popen(shlex.split(ext_command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                    com = pipe.communicate()
                    result_str = com[0]
                    error_str = com[1]
                    self.logfile.write("stdout: {}\n".format(result_str))
                    self.logfile.write("stderr: {}\n".format(error_str))
                if "command" in test:
                    if "command_options" in test:
                        command = test["command"] + " " + test["command_options"]
                    else:
                        command = test["command"] + " " + global_command_options
                    command = self.__replace_var_in_str(command)
                    self.logfile.write("-----> {}\n".format(command))
                    self.log.info("    \\-executing {}".format(command))
                    self.app.stdout = tmp_filename_fd
                    result_id = self.app.run_subcommand(shlex.split(command))
                    tmp_filename_fd.close()
                    self.app.stdout = stdout_back
                    result_str = open(tmp_filename, "r").read()
                    self.logfile.write("{}".format(result_str))
                data_tmp.append(group_name)
                data_tmp.append(test["name"])
                compare = self.__compare_results(self.__replace_var_in_str(test["result"]), result_str)
                self.logfile.write("----->{} ({})\n\n".format(compare, self.__replace_var_in_str(test["result"])))
                if error_str:
                    if compare:
                        compare = "\033[33mTrue\033[m"
                        overall_result = True
                    else:
                        compare = "\033[1m\033[31mFalse\033[m"
                        overall_result = False
                else:
                    overall_result = overall_result and compare
                    if compare:
                        compare = "\033[32mTrue\033[m"
                    else:
                        compare = "\033[1m\033[31mFalse\033[m"
                data_tmp.append(compare)
                data_tmp.append(test["description"])
                data.append(data_tmp)
            data_tmp = list()
            data_tmp.append("\033[1m" + group_name + "\033[m")
            data_tmp.append("\033[1mOverall results ({})\033[m".format(test_count))
            if overall_result:
                data_tmp.append("\033[1m\033[32mTrue\033[m")
            else:
                data_tmp.append("\033[1m\033[31mFalse\033[m")
            data_tmp.append(self.logfile_name)
            data.append(data_tmp)

        return (
            ("group_name", "test_name", "result", "description"),
            data
        )