1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
##############################################################################
# Copyright (c) 2016 Max Breitenfeldt and others.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
import logging
import re
import requests
from django.core.cache import cache
logger = logging.getLogger(__name__)
# TODO: implement caching decorator, cache get_* functions
def get_json(url):
if cache.get(url) is None:
try:
response = requests.get(url)
json = response.json()
cache.set(url, json, 180) # cache result for 180 seconds
return json
except requests.exceptions.RequestException as e:
logger.exception(e)
except ValueError as e:
logger.exception(e)
else:
return cache.get(url)
def get_all_slaves():
url = "https://build.opnfv.org/ci/computer/api/json?tree=computer[displayName,offline,idle]"
json = get_json(url)
if json is not None:
return json['computer'] # return list of dictionaries
return []
def get_slave(slavename):
slaves = get_all_slaves()
for slave in slaves:
if slave['displayName'] == slavename:
return slave
return {}
def get_ci_slaves():
url = "https://build.opnfv.org/ci/label/ci-pod/api/json?tree=nodes[nodeName,offline,idle]"
json = get_json(url)
if json is not None:
return json['nodes']
return []
def get_all_jobs():
url = "https://build.opnfv.org/ci/api/json?tree=jobs[displayName,url,lastBuild[fullDisplayName,building,builtOn,timestamp,result]]"
json = get_json(url)
if json is not None:
return json['jobs'] # return list of dictionaries
return []
def get_jenkins_job(slavename):
jobs = get_all_jobs()
max_time = 0
last_job = None
for job in jobs:
if job['lastBuild'] is not None:
if job['lastBuild']['builtOn'] == slavename:
if job['lastBuild']['building'] is True:
return job # return active build
if job['lastBuild']['timestamp'] > max_time:
last_job = job
max_time = job['lastBuild']['timestamp']
return last_job
def is_ci_slave(slavename):
ci_slaves = get_ci_slaves()
for ci_slave in ci_slaves:
if ci_slave['nodeName'] == slavename:
return True
return False
def is_dev_pod(slavename):
if is_ci_slave(slavename):
return False
if slavename.find('pod') != -1:
return True
return False
def parse_job(job):
result = parse_job_string(job['lastBuild']['fullDisplayName'])
result['building'] = job['lastBuild']['building']
result['result'] = ''
if not job['lastBuild']['building']:
result['result'] = job['lastBuild']['result']
result['url'] = job['url']
return result
def parse_job_string(full_displayname):
job = {}
job['scenario'] = ''
job['installer'] = ''
job['branch'] = ''
tokens = re.split(r'[ -]', full_displayname)
for i in range(len(tokens)):
if tokens[i] == 'os':
job['scenario'] = '-'.join(tokens[i: i + 4])
elif tokens[i] in ['fuel', 'joid', 'apex', 'compass']:
job['installer'] = tokens[i]
elif tokens[i] in ['master', 'arno', 'brahmaputra', 'colorado']:
job['branch'] = tokens[i]
tokens = full_displayname.split(' ')
job['name'] = tokens[0]
return job
def get_slave_url(slave):
return 'https://build.opnfv.org/ci/computer/' + slave['displayName']
def get_slave_status(slave):
if not slave['offline'] and slave['idle']:
return 'online / idle'
if not slave['offline']:
return 'online'
return 'offline'
|