1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
|
# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
# This software is distributed under the terms and conditions of the 'Apache-2.0'
# license which can be found in the file 'LICENSE' in this package distribution
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
import logging
import json
import shlex
import re
from cliff.lister import Lister
from cliff.command import Command
from uuid import uuid4
import os
import time
import subprocess
class TestsLaunch(Lister):
"""Tests launcher."""
log = logging.getLogger(__name__)
result_vars = dict()
logfile_name = "/tmp/moonclient_test_{}.log".format(time.strftime("%Y%m%d-%H%M%S"))
logfile = open(logfile_name, "w")
def get_parser(self, prog_name):
parser = super(TestsLaunch, self).get_parser(prog_name)
parser.add_argument(
'testfile',
metavar='<filename>',
help='Filename that contains tests to run',
)
return parser
def __replace_var_in_str(self, data_str):
self.log.debug("__replace_var_in_str " + data_str)
for exp in re.findall("\$\w+", data_str):
self.log.debug("--->" + exp + str(self.result_vars))
if exp.replace("$", "") in self.result_vars:
data_str = re.sub(exp.replace("$", "\$") + "(?!\w)", self.result_vars[exp.replace("$", "")], data_str)
self.log.debug("__replace_var_in_str " + data_str)
return data_str
def __compare_results(self, expected, observed):
match = re.search(expected, observed)
if match:
self.result_vars.update(match.groupdict())
return True
return False
def take_action(self, parsed_args):
self.log.info("Write tests output to {}".format(self.logfile_name))
stdout_back = self.app.stdout
if not parsed_args.testfile:
self.log.error("You don't give a test filename.")
raise Exception("Cannot execute tests.")
tests_dict = json.load(open(parsed_args.testfile))
self.log.debug("tests_dict = {}".format(tests_dict))
global_command_options = ""
if "command_options" in tests_dict:
global_command_options = tests_dict["command_options"]
data = list()
for group_name, tests_list in tests_dict["tests_group"].iteritems():
overall_result = True
self.log.info("\n\033[1mgroup {}\033[0m".format(group_name))
self.logfile.write("{}:\n\n".format(group_name))
test_count = len(tests_list)
for test in tests_list:
result_str = ""
error_str = ""
if "auth_name" in test or "auth_password" in test or "auth_url" in test:
username = None
password = None
host = None
port = None
description = ""
if "auth_name" in test:
username = test["auth_name"]
if "auth_password" in test:
password = test["auth_password"]
if "auth_host" in test:
host = test["auth_host"]
if "auth_port" in test:
port = test["auth_port"]
if "description" in test:
description = test["description"]
self.app.auth_keystone(username, password, host, port)
title = "Change auth to "
if username:
title += username
if host:
title += "@" + host
if port:
title += ":" + port
title += "\n"
self.logfile.write(title)
self.log.info(title)
data_tmp = list()
data_tmp.append("")
data_tmp.append(title.strip())
data_tmp.append("\033[32mOK\033[m")
data_tmp.append(description.strip())
data.append(data_tmp)
continue
data_tmp = list()
tmp_filename = os.path.join("/tmp", uuid4().hex)
tmp_filename_fd = open(tmp_filename, "w")
self.log.debug("test={}".format(test))
if "command" not in test:
if "external_command" in test:
ext_command = test["external_command"]
else:
ext_command = test["shell_command"]
ext_command = self.__replace_var_in_str(ext_command)
self.logfile.write("-----> {}\n".format(ext_command))
self.log.info(" \\-executing external \"{}\"".format(ext_command))
if "external_command" in test:
pipe = subprocess.Popen(shlex.split(ext_command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
# Note (asteroide): security hazard! Must reduce the possible commands here.
pipe = subprocess.Popen(ext_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
com = pipe.communicate()
result_str = com[0]
error_str = com[1]
self.logfile.write("stdout: {}\n".format(result_str))
self.logfile.write("stderr: {}\n".format(error_str))
if "command" in test:
if "command_options" in test:
command = test["command"] + " " + test["command_options"]
else:
command = test["command"] + " " + global_command_options
command = self.__replace_var_in_str(command)
self.logfile.write("-----> {}\n".format(command))
self.log.info(" \\-executing {}".format(command))
self.app.stdout = tmp_filename_fd
result_id = self.app.run_subcommand(shlex.split(command))
tmp_filename_fd.close()
self.app.stdout = stdout_back
result_str = open(tmp_filename, "r").read()
self.logfile.write("{}".format(result_str))
data_tmp.append(group_name)
data_tmp.append(test["name"])
compare = self.__compare_results(self.__replace_var_in_str(test["result"]), result_str)
self.logfile.write("----->{} ({})\n\n".format(compare, self.__replace_var_in_str(test["result"])))
if error_str:
if compare:
compare = "\033[33mTrue\033[m"
overall_result = overall_result and True
else:
compare = "\033[1m\033[31mFalse\033[m"
overall_result = overall_result and False
else:
overall_result = overall_result and compare
if compare:
if overall_result:
compare = "\033[32mTrue\033[m"
else:
compare = "\033[mTrue\033[m"
else:
compare = "\033[1m\033[31mFalse\033[m"
data_tmp.append(compare)
data_tmp.append(test["description"])
data.append(data_tmp)
data_tmp = list()
data_tmp.append("\033[1m" + group_name + "\033[m")
data_tmp.append("\033[1mOverall results ({})\033[m".format(test_count))
if overall_result:
data_tmp.append("\033[1m\033[32mTrue\033[m")
else:
data_tmp.append("\033[1m\033[31mFalse\033[m")
data_tmp.append(self.logfile_name)
data.append(data_tmp)
return (
("group_name", "test_name", "result", "description"),
data
)
|