1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
# This software is distributed under the terms and conditions of the 'Apache-2.0'
# license which can be found in the file 'LICENSE' in this package distribution
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
import logging
import json
import shlex
import re
from cliff.lister import Lister
from cliff.command import Command
from uuid import uuid4
import os
import time
import subprocess
class TestsLaunch(Lister):
"""Tests launcher."""
log = logging.getLogger(__name__)
result_vars = dict()
logfile_name = "/tmp/moonclient_test_{}.log".format(time.strftime("%Y%m%d-%H%M%S"))
logfile = open(logfile_name, "w")
def get_parser(self, prog_name):
parser = super(TestsLaunch, self).get_parser(prog_name)
parser.add_argument(
'testfile',
metavar='<filename>',
help='Filename that contains tests to run',
)
return parser
def __replace_var_in_str(self, data_str):
self.log.debug("__replace_var_in_str " + data_str)
for exp in re.findall("\$\w+", data_str):
self.log.debug("--->" + exp + str(self.result_vars))
if exp.replace("$", "") in self.result_vars:
data_str = re.sub(exp.replace("$", "\$") + "(?!\w)", self.result_vars[exp.replace("$", "")], data_str)
self.log.debug("__replace_var_in_str " + data_str)
return data_str
def __compare_results(self, expected, observed):
match = re.search(expected, observed)
if match:
self.result_vars.update(match.groupdict())
return True
return False
def take_action(self, parsed_args):
self.log.info("Write tests output to {}".format(self.logfile_name))
stdout_back = self.app.stdout
if not parsed_args.testfile:
self.log.error("You don't give a test filename.")
raise Exception("Cannot execute tests.")
tests_dict = json.load(open(parsed_args.testfile))
self.log.debug("tests_dict = {}".format(tests_dict))
global_command_options = ""
if "command_options" in tests_dict:
global_command_options = tests_dict["command_options"]
data = list()
for group_name, tests_list in tests_dict["tests_group"].iteritems():
overall_result = True
self.log.info("\n\033[1mgroup {}\033[0m".format(group_name))
self.logfile.write("{}:\n\n".format(group_name))
test_count = len(tests_list)
for test in tests_list:
result_str = ""
error_str = ""
data_tmp = list()
tmp_filename = os.path.join("/tmp", uuid4().hex)
tmp_filename_fd = open(tmp_filename, "w")
self.log.debug("test={}".format(test))
if "command" not in test:
ext_command = test["external_command"]
ext_command = self.__replace_var_in_str(ext_command)
self.logfile.write("-----> {}\n".format(ext_command))
self.log.info(" \\-executing external \"{}\"".format(ext_command))
pipe = subprocess.Popen(shlex.split(ext_command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
com = pipe.communicate()
result_str = com[0]
error_str = com[1]
self.logfile.write("stdout: {}\n".format(result_str))
self.logfile.write("stderr: {}\n".format(error_str))
if "command" in test:
if "command_options" in test:
command = test["command"] + " " + test["command_options"]
else:
command = test["command"] + " " + global_command_options
command = self.__replace_var_in_str(command)
self.logfile.write("-----> {}\n".format(command))
self.log.info(" \\-executing {}".format(command))
self.app.stdout = tmp_filename_fd
result_id = self.app.run_subcommand(shlex.split(command))
tmp_filename_fd.close()
self.app.stdout = stdout_back
result_str = open(tmp_filename, "r").read()
self.logfile.write("{}".format(result_str))
data_tmp.append(group_name)
data_tmp.append(test["name"])
compare = self.__compare_results(self.__replace_var_in_str(test["result"]), result_str)
self.logfile.write("----->{} ({})\n\n".format(compare, self.__replace_var_in_str(test["result"])))
if error_str:
if compare:
compare = "\033[33mTrue\033[m"
overall_result = True
else:
compare = "\033[1m\033[31mFalse\033[m"
overall_result = False
else:
overall_result = overall_result and compare
if compare:
compare = "\033[32mTrue\033[m"
else:
compare = "\033[1m\033[31mFalse\033[m"
data_tmp.append(compare)
data_tmp.append(test["description"])
data.append(data_tmp)
data_tmp = list()
data_tmp.append("\033[1m" + group_name + "\033[m")
data_tmp.append("\033[1mOverall results ({})\033[m".format(test_count))
if overall_result:
data_tmp.append("\033[1m\033[32mTrue\033[m")
else:
data_tmp.append("\033[1m\033[31mFalse\033[m")
data_tmp.append(self.logfile_name)
data.append(data_tmp)
return (
("group_name", "test_name", "result", "description"),
data
)
|