aboutsummaryrefslogtreecommitdiffstats
path: root/functest/opnfv_tests/openstack/api/connection_check.py
diff options
context:
space:
mode:
Diffstat (limited to 'functest/opnfv_tests/openstack/api/connection_check.py')
-rw-r--r--functest/opnfv_tests/openstack/api/connection_check.py73
1 files changed, 73 insertions, 0 deletions
diff --git a/functest/opnfv_tests/openstack/api/connection_check.py b/functest/opnfv_tests/openstack/api/connection_check.py
new file mode 100644
index 000000000..eaf9767c0
--- /dev/null
+++ b/functest/opnfv_tests/openstack/api/connection_check.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2018 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Verify the connection to OpenStack Services"""
+
+import logging
+import time
+
+import os_client_config
+import shade
+from xtesting.core import testcase
+
+from functest.utils import env
+from functest.utils import functest_utils
+
+
+class ConnectionCheck(testcase.TestCase):
+ """Perform simplest queries"""
+ __logger = logging.getLogger(__name__)
+
+ func_list = [
+ "get_network_extensions", "list_aggregates", "list_domains",
+ "list_endpoints", "list_floating_ip_pools", "list_floating_ips",
+ "list_hypervisors", "list_keypairs", "list_networks", "list_ports",
+ "list_role_assignments", "list_roles", "list_routers", "list_servers",
+ "list_subnets"]
+
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = 'connection_check'
+ super().__init__(**kwargs)
+ self.output_log_name = 'functest.log'
+ self.output_debug_log_name = 'functest.debug.log'
+ try:
+ cloud_config = os_client_config.get_config()
+ self.cloud = shade.OpenStackCloud(cloud_config=cloud_config)
+ except Exception: # pylint: disable=broad-except
+ self.cloud = None
+
+ def run(self, **kwargs):
+ # pylint: disable=protected-access
+ """Run all read operations to check connections"""
+ status = testcase.TestCase.EX_RUN_ERROR
+ try:
+ assert self.cloud
+ self.start_time = time.time()
+ self.__logger.debug(
+ "list_services: %s", functest_utils.list_services(self.cloud))
+ if env.get('NO_TENANT_NETWORK').lower() == 'true':
+ self.func_list.remove("list_floating_ip_pools")
+ self.func_list.remove("list_floating_ips")
+ self.func_list.remove("list_routers")
+ for func in self.func_list:
+ self.__logger.debug(
+ "%s: %s", func, getattr(self.cloud, func)())
+ data = self.cloud._network_client.get("/service-providers.json")
+ self.__logger.debug(
+ "list_service_providers: %s",
+ self.cloud._get_and_munchify('service_providers', data))
+ functest_utils.get_openstack_version(self.cloud)
+ self.result = 100
+ status = testcase.TestCase.EX_OK
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception('Cannot run %s', self.case_name)
+ finally:
+ self.stop_time = time.time()
+ return status