1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
# Copyright (c) Authors of Clover
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
from os import path
import yaml
from kubernetes import client, config
class KubeClient(object):
def __init__(self):
config.load_kube_config()
self.core_v1 = client.CoreV1Api()
self.extensions_v1beta1 = client.ExtensionsV1beta1Api()
def find_svc_by_namespace(self, svc_name, namespace='default'):
ret_dict = {}
try:
svc = self.core_v1.read_namespaced_service(name=svc_name,
namespace=namespace)
except client.rest.ApiException:
svc = None
if not svc:
print('found no service %s in namespace %s' \
% (svc_name, namespace))
return None
ret_dict[svc.metadata.name] = {}
ret_dict[svc.metadata.name]['labels'] = svc.metadata.labels
ret_dict[svc.metadata.name]['selector'] = svc.spec.selector
return ret_dict
def find_pod_by_namespace(self, namespace='default'):
ret_dict = {}
pods = self.core_v1.list_namespaced_pod(namespace=namespace)
if not pods:
print('found no pod')
return None
for pod in pods.items:
if pod.metadata.name not in ret_dict:
ret_dict[pod.metadata.name] = {}
ret_dict[pod.metadata.name]['labels'] = pod.metadata.labels
return ret_dict
def _check_pod(self, pod_name, namespace='defualt', container_name=None):
ret = self.core_v1.list_namespaced_pod(namespace=namespace)
ret_code = False
new_pod_name = None
for i in ret.items:
if pod_name in i.metadata.name:
if i.status.container_statuses and len(i.status.container_statuses) > 0:
container_up = False
for container in i.status.container_statuses:
check_state = True
if container_name:
if container_name != container.name:
check_state = False
if check_state and container.state.running is not None:
container_up = True
else:
if container_up:
container_up = False
break
if container_up:
ret_code = True
new_pod_name = i.metadata.name
return ret_code, new_pod_name
def check_pod_up(self, pod_name, namespace='default'):
return self._check_pod(pod_name, namespace)
def check_container_in_pods(self, container_name, pods, namespace='default'):
ret = False
for pod in pods:
ret, _ = self._check_pod(pod, namespace, container_name)
if not ret:
return ret
return ret
def create_deployment_yaml(self, deployment_yaml_path, namespace='default'):
with open(deployment_yaml_path) as fp:
body = yaml.load(fp)
resp = self.extensions_v1beta1.create_namespaced_deployment(
body=body, namespace=namespace)
print('Deployment created. Status=%s' % str(resp.status))
dep_name = body.get('metadata').get('name')
return dep_name
def create_service_yaml(self, service_yaml_path, namespace='default'):
with open(service_yaml_path) as fp:
body = yaml.load(fp)
resp = self.extensions_v1beta1.create_namespaced_service(
body=body, namespace=namespace)
print('Service created. Status=%s' % str(resp.status))
svc_name = body.get('metadata').get('name')
return svc_name
|