1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
import restful_server.qtip_server as server
import pytest
import json
import mock
import time
@pytest.fixture
def app():
return server.app
@pytest.fixture
def app_client(app):
client = app.test_client()
return client
def side_effect_sleep(sleep_time):
time.sleep(sleep_time)
def side_effect_pass():
pass
class TestClass:
@pytest.mark.parametrize("body, expected", [
({'installer_type': 'fuel',
'installer_ip': '10.20.0.2'},
{'job_id': '',
'installer_type': 'fuel',
'installer_ip': '10.20.0.2',
'pod_name': 'default',
'suite_name': 'compute',
'max_minutes': 60,
'type': 'BM',
'state': 'finished',
'state_detail': [{'state': 'finished', 'benchmark': 'dhrystone_bm.yaml'},
{'state': 'finished', 'benchmark': 'whetstone_bm.yaml'},
{'state': 'finished', 'benchmark': 'ramspeed_bm.yaml'},
{'state': 'finished', 'benchmark': 'dpi_bm.yaml'},
{'state': 'finished', 'benchmark': 'ssl_bm.yaml'}],
'result': 0}),
({'installer_type': 'fuel',
'installer_ip': '10.20.0.2',
'pod_name': 'zte-pod1',
'max_minutes': 20,
'suite_name': 'compute',
'type': 'VM'},
{'job_id': '',
'installer_type': 'fuel',
'installer_ip': '10.20.0.2',
'pod_name': 'zte-pod1',
'suite_name': 'compute',
'max_minutes': 20,
'type': 'VM',
'state': 'finished',
'state_detail': [{u'state': u'finished', u'benchmark': u'dhrystone_vm.yaml'},
{u'state': u'finished', u'benchmark': u'whetstone_vm.yaml'},
{u'state': u'finished', u'benchmark': u'ramspeed_vm.yaml'},
{u'state': u'finished', u'benchmark': u'dpi_vm.yaml'},
{u'state': u'finished', u'benchmark': u'ssl_vm.yaml'}],
'result': 0})
])
@mock.patch('restful_server.qtip_server.args_handler.prepare_and_run_benchmark')
def test_post_get_delete_job_successful(self, mock_args_handler, app_client, body, expected):
mock_args_handler.return_value = {'result': 0,
'detail': {'host': [(u'10.20.6.14', {'unreachable': 0,
'skipped': 13,
'ok': 27,
'changed': 26,
'failures': 0}),
('localhost', {'unreachable': 0,
'skipped': 0,
'ok': 6,
'changed': 6,
'failures': 0}),
(u'10.20.6.13', {'unreachable': 0,
'skipped': 13,
'ok': 27,
'changed': 26,
'failures': 0})]}}
reply = app_client.post("/api/v1.0/jobs", data=body)
print(reply.data)
id = json.loads(reply.data)['job_id']
expected['job_id'] = id
post_process = ''
while post_process != 'finished':
get_reply = app_client.get("/api/v1.0/jobs/%s" % id)
reply_data = json.loads(get_reply.data)
post_process = reply_data['state']
print(reply_data)
assert len(filter(lambda x: reply_data[x] == expected[x], expected.keys())) == len(expected)
delete_reply = app_client.delete("/api/v1.0/jobs/%s" % id)
assert "successful" in delete_reply.data
@pytest.mark.parametrize("body, expected", [
([{'installer_type': 'fuel',
'installer_ip': '10.20.0.2'},
{'installer_type': 'compass',
'installer_ip': '192.168.20.50'}],
['job_id',
'It already has one job running now!'])
])
@mock.patch('restful_server.qtip_server.args_handler.prepare_and_run_benchmark',
side_effect=[side_effect_sleep(0.5), side_effect_pass])
def test_post_two_jobs_unsuccessful(self, mock_args_hanler, app_client, body, expected):
reply_1 = app_client.post("/api/v1.0/jobs", data=body[0])
reply_2 = app_client.post("/api/v1.0/jobs", data=body[1])
assert expected[0] in json.loads(reply_1.data).keys()
app_client.delete("/api/v1.0/jobs/%s" % json.loads(reply_1.data)['job_id'])
assert expected[1] in json.dumps(reply_2.data)
|