summaryrefslogtreecommitdiffstats
path: root/clover/orchestration/kube_client.py
blob: e5f1d892c430911b4a2414eff0640dcb3a004fa1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# Copyright (c) Authors of Clover
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0

from os import path
import yaml

from kubernetes import client, config

class KubeClient(object):

    def __init__(self):
        config.load_kube_config()
        self.core_v1 = client.CoreV1Api()
        self.extensions_v1beta1 = client.ExtensionsV1beta1Api()

    def find_svc_by_namespace(self, svc_name, namespace='default'):
        ret_dict = {}
        try:
            svc = self.core_v1.read_namespaced_service(name=svc_name,
                                                       namespace=namespace)
        except client.rest.ApiException:
            svc = None
        if not svc:
            print('found no service %s in namespace %s' \
                   % (svc_name, namespace))
            return None
        ret_dict[svc.metadata.name] = {}
        ret_dict[svc.metadata.name]['labels'] = svc.metadata.labels
        ret_dict[svc.metadata.name]['selector'] = svc.spec.selector

        return ret_dict

    def find_pod_by_namespace(self, namespace='default'):
        ret_dict = {}
        pods = self.core_v1.list_namespaced_pod(namespace=namespace)
        if not pods:
            print('found no pod')
            return None
        for pod in pods.items:
            if pod.metadata.name not in ret_dict:
                ret_dict[pod.metadata.name] = {}
            ret_dict[pod.metadata.name]['labels'] = pod.metadata.labels

        return ret_dict

    def _check_pod(self, pod_name, namespace='defualt', container_name=None):
        ret = self.core_v1.list_namespaced_pod(namespace=namespace)
        ret_code = False
        new_pod_name = None
        for i in ret.items:
            if pod_name in i.metadata.name:
                if i.status.container_statuses and len(i.status.container_statuses) > 0:
                    container_up = False
                    for container in i.status.container_statuses:
                        check_state = True
                        if container_name:
                            if container_name != container.name:
                                check_state = False
                        if check_state and container.state.running is not None:
                            container_up = True
                        else:
                            if container_up:
                                container_up = False
                                break
                    if container_up:
                        ret_code = True
                        new_pod_name = i.metadata.name
        return ret_code, new_pod_name

    def check_pod_up(self, pod_name, namespace='default'):
        return self._check_pod(pod_name, namespace)

    def check_container_in_pods(self, container_name, pods, namespace='default'):
        ret = False
        for pod in pods:
            ret, _ = self._check_pod(pod, namespace, container_name)
            if not ret:
                return ret
        return ret

    def create_deployment_yaml(self, deployment_yaml_path, namespace='default'):
        with open(deployment_yaml_path) as fp:
            body = yaml.load(fp)
            resp = self.extensions_v1beta1.create_namespaced_deployment(
                    body=body, namespace=namespace)
            print('Deployment created. Status=%s' % str(resp.status))

            dep_name = body.get('metadata').get('name')
            return dep_name

    def create_service_yaml(self, service_yaml_path, namespace='default'):
        with open(service_yaml_path) as fp:
            body = yaml.load(fp)
            resp = self.extensions_v1beta1.create_namespaced_service(
                    body=body, namespace=namespace)
            print('Service created. Status=%s' % str(resp.status))

            svc_name = body.get('metadata').get('name')
            return svc_name