aboutsummaryrefslogtreecommitdiffstats
path: root/sdv/docker/sdvstate/internal/validator
diff options
context:
space:
mode:
Diffstat (limited to 'sdv/docker/sdvstate/internal/validator')
-rw-r--r--sdv/docker/sdvstate/internal/validator/airship/probe_check.py13
-rw-r--r--sdv/docker/sdvstate/internal/validator/kuberef/helm_check.py35
-rw-r--r--sdv/docker/sdvstate/internal/validator/kuberef/kuberef.py29
-rw-r--r--sdv/docker/sdvstate/internal/validator/kuberef/kubevirt_health_check.py44
-rw-r--r--sdv/docker/sdvstate/internal/validator/kuberef/monitoring_agent_checker.py102
-rw-r--r--sdv/docker/sdvstate/internal/validator/kuberef/node_exporter_checker.py65
-rw-r--r--sdv/docker/sdvstate/internal/validator/kuberef/plugin_check.py152
-rw-r--r--sdv/docker/sdvstate/internal/validator/kuberef/policy_checks.py123
-rw-r--r--sdv/docker/sdvstate/internal/validator/kuberef/security_check.py272
9 files changed, 830 insertions, 5 deletions
diff --git a/sdv/docker/sdvstate/internal/validator/airship/probe_check.py b/sdv/docker/sdvstate/internal/validator/airship/probe_check.py
index 670bd9a..96842c1 100644
--- a/sdv/docker/sdvstate/internal/validator/airship/probe_check.py
+++ b/sdv/docker/sdvstate/internal/validator/airship/probe_check.py
@@ -19,11 +19,11 @@ Probe Checks
2. Liveness
3. Startup
"""
-
+import logging
from tools.kube_utils import kube_api
from tools.conf import settings
-from .store_result import store_result
+from store_result import store_result
def readiness_probe_check():
@@ -31,6 +31,7 @@ def readiness_probe_check():
Checks whether the readiness probe is configured for all overcloud
components deployed as pods on undercloud Kubernetes.
"""
+ logger = logging.getLogger(__name__)
api = kube_api()
namespace_list = settings.getValue('airship_namespace_list')
@@ -61,7 +62,7 @@ def readiness_probe_check():
pod_stats['containers'].append(container_stats)
result['details'].append(pod_stats)
- store_result(result)
+ store_result(logger, result)
return result
def liveness_probe_check():
@@ -69,6 +70,7 @@ def liveness_probe_check():
Checks whether the liveness probe is configured for all overcloud
components deployed as pods on undercloud Kubernetes.
"""
+ logger = logging.getLogger(__name__)
api = kube_api()
namespace_list = settings.getValue('airship_namespace_list')
@@ -99,7 +101,7 @@ def liveness_probe_check():
pod_stats['containers'].append(container_stats)
result['details'].append(pod_stats)
- store_result(result)
+ store_result(logger, result)
return result
def startup_probe_check():
@@ -107,6 +109,7 @@ def startup_probe_check():
Checks whether the startup probe is configured for all overcloud
components deployed as pods on undercloud Kubernetes.
"""
+ logger = logging.getLogger(__name__)
api = kube_api()
namespace_list = settings.getValue('airship_namespace_list')
@@ -137,5 +140,5 @@ def startup_probe_check():
pod_stats['containers'].append(container_stats)
result['details'].append(pod_stats)
- store_result(result)
+ store_result(logger, result)
return result
diff --git a/sdv/docker/sdvstate/internal/validator/kuberef/helm_check.py b/sdv/docker/sdvstate/internal/validator/kuberef/helm_check.py
new file mode 100644
index 0000000..55f4052
--- /dev/null
+++ b/sdv/docker/sdvstate/internal/validator/kuberef/helm_check.py
@@ -0,0 +1,35 @@
+"""
+Helm 2 disabled check
+
+Checks if the helm v2 is supported in the cluster
+"""
+
+import logging
+from tools.kube_utils import kube_api
+from tools.conf import settings
+from internal.store_result import store_result
+
+def helmv2_disabled_check():
+ """
+ Checks for helm v2 support
+ """
+ result = {'category': 'platform',
+ 'case_name': 'helmv2_disabled_check',
+ 'criteria': 'pass',
+ 'details': []
+ }
+ kube = kube_api()
+ logger = logging.getLogger(__name__)
+ res = False
+ pod_details = kube.list_pod_for_all_namespaces()
+ pods = pod_details.items
+ version_support = settings.getValue('pdf_file')['vim_functional']['legacy_helm_support']
+ if 'YES' in version_support:
+ for pod in pods:
+ if 'tiller' in pod.metadata.name:
+ res = True
+ result['details'].append(pod)
+ if res is False:
+ result['criteria'] = 'fail'
+ store_result(logger, result)
+ return result
diff --git a/sdv/docker/sdvstate/internal/validator/kuberef/kuberef.py b/sdv/docker/sdvstate/internal/validator/kuberef/kuberef.py
index 4768e81..f42c723 100644
--- a/sdv/docker/sdvstate/internal/validator/kuberef/kuberef.py
+++ b/sdv/docker/sdvstate/internal/validator/kuberef/kuberef.py
@@ -22,6 +22,14 @@ from datetime import datetime as dt
from internal import store_result
from internal.validator.validator import Validator
+from internal.validator.kuberef.policy_checks import topology_manager_policy_check, cpu_manager_policy_check
+from internal.validator.kuberef.security_check import capability_check, privilege_check, host_network_check
+from internal.validator.kuberef.security_check import host_path_vol_check, k8s_api_conn_check
+from internal.validator.kuberef.monitoring_agent_checker import collectd_check, monitoring_agent_check
+from internal.validator.kuberef.node_exporter_checker import node_exporter_check
+from internal.validator.kuberef.plugin_check import cni_plugin_check, multi_interface_cni_check
+from internal.validator.kuberef.helm_check import helmv2_disabled_check
+from internal.validator.kuberef.kubevirt_health_check import kubevirt_check
from tools.conf import settings
from tools.kube_utils import load_kube_api
@@ -82,8 +90,29 @@ class KuberefValidator(Validator):
# PLATFORM CHECKS
self.update_report(pod_health_check())
+ self.update_report(kubevirt_check())
+ self.update_report(helmv2_disabled_check())
+ self.update_report(capability_check())
+ self.update_report(privilege_check())
+ self.update_report(host_network_check())
+ self.update_report(host_path_vol_check())
+ self.update_report(k8s_api_conn_check())
+
+
+ # MONITORING & LOGGING AGENT CHECKS
+ self.update_report(monitoring_agent_check())
+ self.update_report(collectd_check())
+ self.update_report(node_exporter_check())
# COMPUTE CHECKS
+ self.update_report(cpu_manager_policy_check())
+ self.update_report(topology_manager_policy_check())
+
+
+ # NETWORK CHECKS
+ self.update_report(cni_plugin_check())
+ self.update_report(multi_interface_cni_check())
+
def get_report(self):
diff --git a/sdv/docker/sdvstate/internal/validator/kuberef/kubevirt_health_check.py b/sdv/docker/sdvstate/internal/validator/kuberef/kubevirt_health_check.py
new file mode 100644
index 0000000..08bb3c7
--- /dev/null
+++ b/sdv/docker/sdvstate/internal/validator/kuberef/kubevirt_health_check.py
@@ -0,0 +1,44 @@
+"""
+Kubevirt Check
+Checks the existence and health of kubevirt
+"""
+
+import logging
+from tools.kube_utils import kube_api
+from internal.checks.pod_health_check import pod_status, get_logs
+from internal.store_result import store_result
+
+def kubevirt_check():
+ """
+ Checks for existence kubevirt namespace and checks health of the pods within
+
+ """
+ k8s_api = kube_api()
+ namespaces = k8s_api.list_namespace()
+ ns_names = []
+ for nspace in namespaces.items:
+ ns_names.append(nspace.metadata.name)
+
+ result = {'category': 'platform',
+ 'case_name': 'kubevirt_check',
+ 'criteria': 'pass',
+ 'details': []
+ }
+
+ logger = logging.getLogger(__name__)
+
+ if 'kubevirt' in ns_names:
+ result['criteria'] = 'pass'
+ result['details'].append(ns_names)
+ pod_list = k8s_api.list_namespaced_pod('kubevirt')
+ for pod in pod_list.items:
+ pod_stats = pod_status(logger, pod)
+ if pod_stats['criteria'] == 'fail':
+ pod_stats['logs'] = get_logs(k8s_api, pod)
+ result['criteria'] = 'fail'
+ result['details'].append(pod_stats)
+ else:
+ result['criteria'] = 'fail'
+
+ store_result(logger, result)
+ return result
diff --git a/sdv/docker/sdvstate/internal/validator/kuberef/monitoring_agent_checker.py b/sdv/docker/sdvstate/internal/validator/kuberef/monitoring_agent_checker.py
new file mode 100644
index 0000000..bc94c33
--- /dev/null
+++ b/sdv/docker/sdvstate/internal/validator/kuberef/monitoring_agent_checker.py
@@ -0,0 +1,102 @@
+"""
+Monitoring agent checks
+Checks for prometheus and collectd existence and health
+"""
+
+import logging
+from tools.kube_utils import kube_api
+from internal.store_result import store_result
+from internal.checks.pod_health_check import pod_status, get_logs
+
+def health_checker(pod, api_instance, logger, result):
+ """
+ Checks the health of pod
+ """
+ status = []
+ pod_stats = pod_status(logger, pod)
+
+ if pod_stats['criteria'] == 'fail':
+ pod_stats['logs'] = get_logs(api_instance, pod)
+ result['criteria'] = 'fail'
+
+ status.append(pod.metadata.name)
+ status.append(pod_stats)
+ return status
+
+def monitoring_agent_check():
+ """
+ Checks existence & health of prometheus pods
+ """
+ api_instance = kube_api()
+ namespaces = api_instance.list_namespace()
+ ns_names = []
+
+ for nspace in namespaces.items:
+ ns_names.append(nspace.metadata.name)
+
+ result = {'category': 'observability',
+ 'case_name': 'prometheus_check',
+ 'criteria': 'pass',
+ 'details': []
+ }
+
+ status = []
+ flag = False
+ logger = logging.getLogger(__name__)
+ if 'monitoring' in ns_names:
+ pod_details = api_instance.list_namespaced_pod('monitoring', watch=False)
+ pods = pod_details.items
+ for pod in pods:
+ if 'prometheus' in pod.metadata.name:
+ stats = health_checker(pod, api_instance, logger, result)
+ status.append(stats)
+ flag = True
+ else:
+ for name in ns_names:
+ pod_details = api_instance.list_namespaced_pod(name, watch=False)
+ pods = pod_details.items
+ for pod in pods:
+ if 'prometheus' in pod.metadata.name:
+ stats = health_checker(pod, api_instance, logger, result)
+ status.append(stats)
+ flag = True
+
+ if flag is False:
+ result['criteria'] = 'fail'
+
+ result['details'].append(status)
+ store_result(logger, result)
+ return result
+
+
+def collectd_check():
+ """
+ Checks for collectd pods present and their state of being
+ """
+ api_instance = kube_api()
+ pod_details = api_instance.list_pod_for_all_namespaces()
+ pods = pod_details.items
+
+ result = {'category': 'observability',
+ 'case_name': 'collectd_check',
+ 'criteria': 'pass',
+ 'details': []
+ }
+
+ logger = logging.getLogger(__name__)
+
+ status = []
+
+ flag = False
+ for pod in pods:
+ if 'collectd' in pod.metadata.name:
+ stats = health_checker(pod, api_instance, logger, result)
+ status.append(stats)
+ flag = True
+
+ if flag is False:
+ result['criteria'] = 'fail'
+
+ result['details'].append(status)
+ store_result(logger, result)
+ return result
diff --git a/sdv/docker/sdvstate/internal/validator/kuberef/node_exporter_checker.py b/sdv/docker/sdvstate/internal/validator/kuberef/node_exporter_checker.py
new file mode 100644
index 0000000..7262fb1
--- /dev/null
+++ b/sdv/docker/sdvstate/internal/validator/kuberef/node_exporter_checker.py
@@ -0,0 +1,65 @@
+"""
+Node Exporter Check
+"""
+
+import logging
+from tools.kube_utils import kube_api
+from internal.checks.pod_health_check import pod_status, get_logs
+from internal.store_result import store_result
+
+
+def node_exporter_check():
+ """
+ Checks existence & health of node exporter pods
+ """
+ kube = kube_api()
+ namespaces = kube.list_namespace()
+ ns_names = []
+ for nspace in namespaces.items:
+ ns_names.append(nspace.metadata.name)
+
+ result = {'category': 'observability',
+ 'case_name': 'node_exporter_check',
+ 'criteria': 'pass',
+ 'details': []
+ }
+
+ status = []
+
+ flag = False
+
+ logger = logging.getLogger(__name__)
+
+ if 'monitoring' in ns_names:
+ pod_list = kube.list_namespaced_pod('monitoring', watch=False)
+ pods = pod_list.items
+ for pod in pods:
+ if 'node-exporter' in pod.metadata.name:
+ pod_stats = pod_status(logger, pod)
+ if pod_stats['criteria'] == 'fail':
+ pod_stats['logs'] = get_logs(kube, pod)
+ result['criteria'] = 'fail'
+ status.append(pod.metadata.name)
+ status.append(pod_stats)
+ flag = True
+ else:
+ for nspace in namespaces.items:
+ pod_list = kube.list_namespaced_pod(nspace.metadata.name, watch=False)
+ pods = pod_list.items
+ for pod in pods:
+ if 'node-exporter' in pod.metadata.name:
+ pod_stats = pod_status(logger, pod)
+ if pod_stats['criteria'] == 'fail':
+ pod_stats['logs'] = get_logs(kube, pod)
+ result['criteria'] = 'fail'
+ status.append(pod.metadata.name)
+ status.append(pod_stats)
+ flag = True
+
+ if flag is False:
+ result['criteria'] = 'fail'
+
+ result['details'].append(status)
+
+ store_result(logger, result)
+ return result
diff --git a/sdv/docker/sdvstate/internal/validator/kuberef/plugin_check.py b/sdv/docker/sdvstate/internal/validator/kuberef/plugin_check.py
new file mode 100644
index 0000000..e964707
--- /dev/null
+++ b/sdv/docker/sdvstate/internal/validator/kuberef/plugin_check.py
@@ -0,0 +1,152 @@
+"""
+CNI Plugin Check
+Multi-interface CNI Check
+"""
+
+import time
+import logging
+from kubernetes import client
+from tools.kube_utils import kube_api, kube_exec
+from tools.conf import settings
+from internal.store_result import store_result
+
+def create_daemonset(apps_instance):
+ """
+ Creates daemonset for the checks
+ """
+ manifest = {
+ 'apiVersion': 'apps/v1',
+ 'kind': 'DaemonSet',
+ 'metadata': {
+ 'name': 'plugin-check-test-set',
+ 'namespace': 'default'
+ },
+ 'spec': {
+ 'selector': {
+ 'matchLabels': {
+ 'name': 'alpine'
+ }
+ },
+ 'template': {
+ 'metadata': {
+ 'labels': {
+ 'name': 'alpine'
+ }
+ }
+ },
+ 'spec': {
+ 'containers': [{
+ 'name': 'alpine',
+ 'image': 'alpine:3.2',
+ 'command': ["sh", "-c", "echo \"Hello K8s\" && sleep 3600"],
+ 'volumeMounts': [{
+ 'name': 'etccni',
+ 'mountPath': '/etc/cni'
+ }, {
+ 'name': 'optcnibin',
+ 'mountPath': '/opt/cni/bin',
+ 'readOnly': True
+ }]
+ }],
+ 'volumes': [{
+ 'name': 'etccni',
+ 'hostPath': {
+ 'path': '/etc/cni'
+ }
+ }, {
+ 'name': 'optcnibin',
+ 'hostPath': {
+ 'path': '/opt/cni/bin'
+ }
+ }],
+ 'tolerations': [{
+ 'effect': 'NoSchedule',
+ 'key': 'node-role.kubernetes.io/master',
+ 'operator': 'Exists'
+ }]
+ }
+ }
+ }
+ apps_instance.create_namespaced_daemon_set('default', manifest)
+ time.sleep(6)
+
+
+def multi_interface_cni_check():
+ """
+ Checks if multi interface cni is enabled
+ """
+ apps_instance = client.AppsV1Api()
+ api_instance = kube_api()
+ logger = logging.getLogger(__name__)
+
+ result = {'category': 'network',
+ 'case_name': 'multi_interface_cni_check',
+ 'criteria': 'pass',
+ 'details': []
+ }
+
+ create_daemonset(apps_instance)
+ pod_details = api_instance.list_namespaced_pod('default', watch=False)
+ pods = pod_details.items
+ status = []
+ cmd = ['ls', '/etc/cni/net.d']
+
+ for pod in pods:
+ if 'plugin-check-test-set' in pod.metadata.name:
+ list_of_plugin_conf = kube_exec(pod, cmd)
+ list_of_plugin_conf = list_of_plugin_conf.split("\n")
+
+ cmd3 = ['cat', list_of_plugin_conf[0]]
+ multi_interface_conf = kube_exec(pod, cmd3)
+
+ if 'multus' not in multi_interface_conf:
+ result['criteria'] = 'fail'
+
+ status.append(list_of_plugin_conf)
+ status.append(multi_interface_conf)
+
+ apps_instance.delete_namespaced_daemon_set('plugin-check-test-set', 'default')
+ result['details'].append(status)
+ store_result(logger, result)
+ return result
+
+def cni_plugin_check():
+ """
+ Checks for CNI plugins and validate against PDF
+ """
+ apps_instance = client.AppsV1Api()
+ api_instance = kube_api()
+
+ result = {'category': 'network',
+ 'case_name': 'cni_plugin_check',
+ 'criteria': 'pass',
+ 'details': []
+ }
+
+ logger = logging.getLogger(__name__)
+ create_daemonset(apps_instance)
+ pod_details = api_instance.list_namespaced_pod('default', watch=False)
+ pods = pod_details.items
+ daemon_pods = []
+ status = []
+ cmd = ['ls', '/opt/cni/bin']
+ cni_plugins = settings.getValue('pdf_file')['vim_functional']['cnis_supported']
+
+
+ for pod in pods:
+ if 'plugin-check-test-set' in pod.metadata.name:
+ list_of_cni_from_dir = kube_exec(pod, cmd)
+
+ for plugin in cni_plugins:
+ if plugin not in list_of_cni_from_dir:
+ result['criteria'] = 'fail'
+
+ status.append(list_of_cni_from_dir)
+ daemon_pods.append(pod.metadata.name)
+
+ apps_instance.delete_namespaced_daemon_set('plugin-check-test-set', 'default')
+
+ result['details'].append(daemon_pods)
+ result['details'].append(status)
+ store_result(logger, result)
+ return result
diff --git a/sdv/docker/sdvstate/internal/validator/kuberef/policy_checks.py b/sdv/docker/sdvstate/internal/validator/kuberef/policy_checks.py
new file mode 100644
index 0000000..6993fd7
--- /dev/null
+++ b/sdv/docker/sdvstate/internal/validator/kuberef/policy_checks.py
@@ -0,0 +1,123 @@
+"""
+Policy Checks
+Checks if the policies are properly configured
+"""
+
+
+import ast
+import logging
+from tools.kube_utils import kube_api
+from tools.conf import settings
+from internal.store_result import store_result
+
+def cpu_manager_policy_check():
+ """
+ Checks cpu manager settings
+ """
+ api = kube_api()
+ logger = logging.getLogger(__name__)
+ node_list = api.list_node()
+ nodes = []
+
+ for node in node_list:
+ nodes.append(node.metadata.name)
+
+ result = {'category': 'compute',
+ 'case_name': 'cpu_manager_policy_check',
+ 'criteria': 'pass',
+ 'details': []
+ }
+
+ for node in nodes:
+ configz = api.connect_get_node_proxy_with_path(node, "configz")
+ configz = ast.literal_eval(configz)
+ res = {
+ 'node': node,
+ 'criteria': 'pass',
+ 'config': []
+ }
+
+ status = []
+
+ flag = True
+
+ cpu_manager = settings.getValue('pdf_file')['vim_functional']['cpu_manager_policy']
+
+ if cpu_manager['type'] == configz['kubeletconfig']['cpuManagerPolicy']:
+ if cpu_manager['type'] == 'static':
+ if cpu_manager['reconcile_period'] == configz['kubeletconfig']['cpuManagerReconcilePeriod']:
+ if cpu_manager['full_pcpus'] == configz['kubeletconfig']['full-pcpus-only']:
+ flag = flag and True
+ else:
+ flag = flag and False
+ else:
+ flag = flag and True
+ else:
+ flag = flag and False
+
+ if flag is False:
+ res['criteria'] = 'fail'
+
+ status.append(cpu_manager)
+ res['config'] = status
+ result['details'].append(res)
+
+
+ if flag is False:
+ result['criteria'] = 'fail'
+
+ store_result(logger, result)
+ return result
+
+def topology_manager_policy_check():
+ """
+ Checks topology manager settings
+ """
+ api = kube_api()
+ logger = logging.getLogger(__name__)
+ node_list = api.list_node()
+ nodes = []
+
+ for node in node_list:
+ nodes.append(node.metadata.name)
+
+
+ result = {
+ 'category': 'compute',
+ 'case_name': 'topology_manager_policy_check',
+ 'criteria': 'pass',
+ 'details': []
+ }
+
+ for node in nodes:
+ configz = api.connect_get_node_proxy_with_path(node, "configz")
+ configz = ast.literal_eval(configz)
+ res = {
+ 'node': node,
+ 'criteria': 'pass',
+ 'config': []
+ }
+
+ status = []
+
+ flag = True
+
+ topology_manager = settings.getValue('pdf_file')['undercloud_ook']['topo_manager_policy']
+
+ if topology_manager['type'] == configz['kubeletconfig']['topologyManagerPolicy']:
+ if topology_manager['scope'] == configz['kubeletconfig']['topologyManagerScope']:
+ flag = flag and True
+ else:
+ flag = flag and False
+ if flag is False:
+ res['criteria'] = 'fail'
+
+ status.append(topology_manager)
+ res['config'] = status
+ result['details'].append(res)
+
+ if flag is False:
+ result['criteria'] = 'fail'
+
+ store_result(logger, result)
+ return result
diff --git a/sdv/docker/sdvstate/internal/validator/kuberef/security_check.py b/sdv/docker/sdvstate/internal/validator/kuberef/security_check.py
new file mode 100644
index 0000000..f49048c
--- /dev/null
+++ b/sdv/docker/sdvstate/internal/validator/kuberef/security_check.py
@@ -0,0 +1,272 @@
+"""
+Security Checks
+"""
+
+import time
+import logging
+from tools.kube_utils import kube_api, kube_curl
+from tools.kube_utils import kube_exec
+from internal.store_result import store_result
+
+# capability check
+def capability_check():
+ """
+ Checks if creation of pods with particular capabilties is possible
+ """
+ kube = kube_api()
+ logger = logging.getLogger(__name__)
+ pod_manifest = {
+ 'apiVersion': 'v1',
+ 'kind': 'Pod',
+ 'metadata': {
+ 'name': 'security-capability-demo',
+ },
+ 'spec': {
+ 'containers': [{
+ 'image': 'alpine:3.2',
+ 'name': 'security-capability-demo',
+ 'command': ["/bin/sh", "-c", "sleep 60m"],
+ 'securityContext': {
+ 'capabilities': {
+ 'drop': [
+ "ALL"
+ ],
+ 'add': [
+ 'NET_ADMIN', 'NET_RAW'
+ ]
+ }
+ }
+ }]
+ }
+ }
+ result = {'category': 'platform',
+ 'case_name': 'capability_check',
+ 'criteria': 'pass',
+ 'details': []
+ }
+ status = []
+ try:
+ pod_cap = kube.create_namespaced_pod(body=pod_manifest, namespace='default')
+ time.sleep(6)
+ cmd = ['cat', '/proc/1/status']
+
+ response = kube_exec(pod_cap, cmd)
+ if "0000000000003000" in response:
+ result['criteria'] = 'fail'
+ status.append(pod_cap)
+ kube.delete_namespaced_pod(name=pod_cap.metadata.name, namespace='default')
+
+ except KeyError as error:
+ status.append(error)
+
+ except RuntimeError as error:
+ status.append(error)
+
+ result['details'].append(status)
+ store_result(logger, result)
+ return result
+
+# privileges check
+def privilege_check():
+ """
+ Checks if privileged pods are possible to created
+ """
+ kube = kube_api()
+ logger = logging.getLogger(__name__)
+
+ pod_manifest = {
+ 'apiVersion': 'v1',
+ 'kind': 'Pod',
+ 'metadata': {
+ 'name': 'security-privileges-demo',
+ },
+ 'spec': {
+ 'containers': [{
+ 'image': 'alpine:3.2',
+ 'name': 'security-privileges-demo',
+ 'command': ["/bin/sh", "-c", "sleep 60m"],
+ 'securityContext': {
+ 'privileged': True
+ }
+ }]
+ }
+ }
+ result = {'category': 'platform',
+ 'case_name': 'privilege_check',
+ 'criteria': 'pass',
+ 'details': []
+ }
+
+ status = []
+
+ try:
+ pod_priv = kube.create_namespaced_pod(body=pod_manifest, namespace='default')
+ time.sleep(5)
+ cmd = ['ps', 'aux']
+
+ response = kube_exec(pod_priv, cmd)
+
+ if "root" in response:
+ result['criteria'] = 'fail'
+ status.append(response)
+
+ kube.delete_namespaced_pod(name=pod_priv.metadata.name, namespace='default')
+
+ except KeyError as error:
+ status.append(error)
+
+ except RuntimeError as error:
+ status.append(error)
+
+ result['details'].append(status)
+
+ store_result(logger, result)
+ return result
+
+# host network check
+def host_network_check():
+ """
+ Checks if the pods can share the network with their host
+ """
+ kube = kube_api()
+ logger = logging.getLogger(__name__)
+
+ pod_manifest = {
+ 'apiVersion': 'v1',
+ 'kind': 'Pod',
+ 'metadata': {
+ 'name': 'security-host-network-demo',
+ },
+ 'spec': {
+ 'hostNetwork': True,
+ 'containers': [{
+ 'image': 'k8s.gcr.io/pause',
+ 'name': 'security-host-network-demo',
+ 'command': ["/bin/sh", "-c", "sleep 60m"],
+ }],
+ 'restartPolicy': 'Always'
+ }
+ }
+ result = {'category': 'platform',
+ 'case_name': 'host_network_check',
+ 'criteria': 'pass',
+ 'details': []
+ }
+
+ status = []
+
+ try:
+ pod_nw = kube.create_namespaced_pod(body=pod_manifest, namespace='default')
+ time.sleep(5)
+
+ kube.delete_namespaced_pod(name=pod_nw.metadata.name, namespace='default')
+ result['criteria'] = 'fail'
+
+ except KeyError as error:
+ status.append(error)
+
+ except RuntimeError as error:
+ status.append(error)
+
+ result['details'].append(status)
+
+ store_result(logger, result)
+ return result
+
+# host directory as a volume check
+def host_path_vol_check():
+ """
+ Checks if pods can be mounted to a host directory
+ """
+ kube = kube_api()
+ logger = logging.getLogger(__name__)
+
+ pod_manifest = {
+ 'apiVersion': 'v1',
+ 'kind': 'Pod',
+ 'metadata': {
+ 'name': 'security-host-path-volume-demo',
+ },
+ 'spec': {
+ 'hostNetwork': True,
+ 'containers': [{
+ 'image': 'k8s.gcr.io/pause',
+ 'name': 'security-host-path-volume-demo',
+ 'command': ["/bin/sh", "-c", "sleep 60m"],
+ }],
+ 'volumes': [
+ {
+ 'name': 'test-vol',
+ 'hostpath': {
+ 'path': 'home',
+ 'type': 'Directory'
+ }
+ }
+ ]
+ }
+ }
+ result = {'category': 'platform',
+ 'case_name': 'host_path_dir_vol_check',
+ 'criteria': 'pass',
+ 'details': []
+ }
+
+ status = []
+
+ try:
+ pod_vol = kube.create_namespaced_pod(body=pod_manifest, namespace='default')
+
+ time.sleep(5)
+
+ kube.delete_namespaced_pod(name=pod_vol.metadata.name, namespace='default')
+ result['criteria'] = 'fail'
+
+ except KeyError as error:
+ status.append(error)
+
+ except RuntimeError as error:
+ status.append(error)
+
+ result['details'].append(status)
+
+ store_result(logger, result)
+ return result
+
+# kubernetes api connectivity check
+def k8s_api_conn_check():
+ """
+ Checks for connectivity from within the pod
+ """
+
+ result = {'category': 'platform',
+ 'case_name': 'connectivity_check',
+ 'criteria': 'pass',
+ 'details': []
+ }
+
+ status = []
+ logger = logging.getLogger(__name__)
+
+ try:
+ ca_crt = '/var/run/secrets/kubernetes.io/serviceaccount/ca.crt'
+ auth_tkn = '"Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)"'
+ url = 'https://kubernetes.default.svc'
+ response = kube_curl('-v', '--cacert', ca_crt, '-H', auth_tkn, url)
+
+ if "Connected to kubernetes" in response:
+ result['criteria'] = 'pass'
+ else:
+ result['criteria'] = 'fail'
+
+ status.append(response)
+
+ except ConnectionError as error:
+ status.append(error)
+
+ except RuntimeError as error:
+ status.append(error)
+
+ result['details'].append(status)
+
+ store_result(logger, result)
+ return result