aboutsummaryrefslogtreecommitdiffstats
path: root/functest_kubernetes
diff options
context:
space:
mode:
authorCédric Ollivier <cedric.ollivier@orange.com>2020-08-13 12:51:55 +0200
committerCédric Ollivier <cedric.ollivier@orange.com>2020-08-13 12:54:19 +0200
commit8e2a7dbee8f134dbe9022683d40e2328e5e50fe6 (patch)
tree2950002cc16821d2e0d7cfdc12f1eb22296d71d8 /functest_kubernetes
parentfac999583784f6e3b2c64294f2347fdf8a7a64f0 (diff)
Make K8s security tests namespace aware
It now creates a namespace to allow running the test cases twice in parallel. It also overprotects clean operations to force a full delete. Change-Id: Ie0becd8ea9126328e7280591bacc0d88e14dd031 Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
Diffstat (limited to 'functest_kubernetes')
-rw-r--r--functest_kubernetes/security/security.py43
1 files changed, 28 insertions, 15 deletions
diff --git a/functest_kubernetes/security/security.py b/functest_kubernetes/security/security.py
index b878cb77..2051f956 100644
--- a/functest_kubernetes/security/security.py
+++ b/functest_kubernetes/security/security.py
@@ -27,7 +27,6 @@ from xtesting.core import testcase
class SecurityTesting(testcase.TestCase):
# pylint: disable=too-many-instance-attributes
"""Run Security job"""
- namespace = 'default'
watch_timeout = 1200
__logger = logging.getLogger(__name__)
@@ -41,6 +40,7 @@ class SecurityTesting(testcase.TestCase):
self.job_name = None
self.output_log_name = 'functest-kubernetes.log'
self.output_debug_log_name = 'functest-kubernetes.debug.log'
+ self.namespace = ""
def deploy_job(self):
"""Run Security job
@@ -49,12 +49,17 @@ class SecurityTesting(testcase.TestCase):
"""
assert self.job_name
+ api_response = self.corev1.create_namespace(
+ client.V1Namespace(metadata=client.V1ObjectMeta(
+ generate_name="ims-")))
+ self.namespace = api_response.metadata.name
+ self.__logger.debug("create_namespace: %s", api_response)
with open(pkg_resources.resource_filename(
"functest_kubernetes",
"security/{}.yaml".format(self.job_name))) as yfile:
body = yaml.safe_load(yfile)
api_response = self.batchv1.create_namespaced_job(
- body=body, namespace="default")
+ body=body, namespace=self.namespace)
self.__logger.info("Job %s created", api_response.metadata.name)
self.__logger.debug("create_namespaced_job: %s", api_response)
watch_job = watch.Watch()
@@ -85,19 +90,27 @@ class SecurityTesting(testcase.TestCase):
self.stop_time = time.time()
def clean(self):
- try:
- api_response = self.corev1.delete_namespaced_pod(
- name=self.pod, namespace=self.namespace)
- self.__logger.debug("delete_namespaced_pod: %s", api_response)
- except client.rest.ApiException:
- pass
- try:
- api_response = self.batchv1.delete_namespaced_job(
- name=self.job_name, namespace=self.namespace)
- self.__logger.debug(
- "delete_namespaced_deployment: %s", api_response)
- except client.rest.ApiException:
- pass
+ if self.pod:
+ try:
+ api_response = self.corev1.delete_namespaced_pod(
+ name=self.pod, namespace=self.namespace)
+ self.__logger.debug("delete_namespaced_pod: %s", api_response)
+ except client.rest.ApiException:
+ pass
+ if self.job_name:
+ try:
+ api_response = self.batchv1.delete_namespaced_job(
+ name=self.job_name, namespace=self.namespace)
+ self.__logger.debug(
+ "delete_namespaced_deployment: %s", api_response)
+ except client.rest.ApiException:
+ pass
+ if self.namespace:
+ try:
+ api_response = self.corev1.delete_namespace(self.namespace)
+ self.__logger.debug("delete_namespace: %s", self.namespace)
+ except client.rest.ApiException:
+ pass
class KubeHunter(SecurityTesting):