aboutsummaryrefslogtreecommitdiffstats
path: root/functest/opnfv_tests
diff options
context:
space:
mode:
Diffstat (limited to 'functest/opnfv_tests')
-rw-r--r--functest/opnfv_tests/vnf/router/cloudify_vrouter.py59
1 files changed, 23 insertions, 36 deletions
diff --git a/functest/opnfv_tests/vnf/router/cloudify_vrouter.py b/functest/opnfv_tests/vnf/router/cloudify_vrouter.py
index 1596860d..020085ba 100644
--- a/functest/opnfv_tests/vnf/router/cloudify_vrouter.py
+++ b/functest/opnfv_tests/vnf/router/cloudify_vrouter.py
@@ -18,12 +18,12 @@ import time
from cloudify_rest_client import CloudifyClient
from cloudify_rest_client.executions import Execution
from scp import SCPClient
-import yaml
from functest.opnfv_tests.openstack.snaps import snaps_utils
import functest.opnfv_tests.vnf.router.vrouter_base as vrouter_base
from functest.opnfv_tests.vnf.router.utilvnf import Utilvnf
from functest.utils.constants import CONST
+from functest.utils import functest_utils
from git import Repo
@@ -79,35 +79,46 @@ class CloudifyVrouter(vrouter_base.VrouterOnBoardingBase):
config_file = os.path.join(self.case_dir, self.config)
self.orchestrator = dict(
- requirements=get_config("orchestrator.requirements", config_file),
+ requirements=functest_utils.get_parameter_from_yaml(
+ "orchestrator.requirements", config_file),
)
self.details['orchestrator'] = dict(
- name=get_config("orchestrator.name", config_file),
- version=get_config("orchestrator.version", config_file),
+ name=functest_utils.get_parameter_from_yaml(
+ "orchestrator.name", config_file),
+ version=functest_utils.get_parameter_from_yaml(
+ "orchestrator.version", config_file),
status='ERROR',
result=''
)
self.__logger.debug("Orchestrator configuration %s", self.orchestrator)
self.__logger.debug("name = %s", self.name)
self.vnf = dict(
- descriptor=get_config("vnf.descriptor", config_file),
- inputs=get_config("vnf.inputs", config_file),
- requirements=get_config("vnf.requirements", config_file)
+ descriptor=functest_utils.get_parameter_from_yaml(
+ "vnf.descriptor", config_file),
+ inputs=functest_utils.get_parameter_from_yaml(
+ "vnf.inputs", config_file),
+ requirements=functest_utils.get_parameter_from_yaml(
+ "vnf.requirements", config_file)
)
self.details['vnf'] = dict(
descriptor_version=self.vnf['descriptor']['version'],
- name=get_config("vnf.name", config_file),
- version=get_config("vnf.version", config_file),
+ name=functest_utils.get_parameter_from_yaml(
+ "vnf.name", config_file),
+ version=functest_utils.get_parameter_from_yaml(
+ "vnf.version", config_file),
)
self.__logger.debug("VNF configuration: %s", self.vnf)
self.util = Utilvnf()
self.details['test_vnf'] = dict(
- name=get_config("vnf_test_suite.name", config_file),
- version=get_config("vnf_test_suite.version", config_file)
+ name=functest_utils.get_parameter_from_yaml(
+ "vnf_test_suite.name", config_file),
+ version=functest_utils.get_parameter_from_yaml(
+ "vnf_test_suite.version", config_file)
)
- self.images = get_config("tenant_images", config_file)
+ self.images = functest_utils.get_parameter_from_yaml(
+ "tenant_images", config_file)
self.__logger.info("Images needed for vrouter: %s", self.images)
def prepare(self):
@@ -433,30 +444,6 @@ class CloudifyVrouter(vrouter_base.VrouterOnBoardingBase):
self.cfy_manager_ip, self.deployment_name, target_vnf_name)
-# ----------------------------------------------------------
-#
-# YAML UTILS
-#
-# -----------------------------------------------------------
-def get_config(parameter, file_path):
- """
- Get config parameter.
- Returns the value of a given parameter in file.yaml
- parameter must be given in string format with dots
- Example: general.openstack.image_name
- """
- with open(file_path) as config_file:
- file_yaml = yaml.safe_load(config_file)
- config_file.close()
- value = file_yaml
- for element in parameter.split("."):
- value = value.get(element)
- if value is None:
- raise ValueError("The parameter %s is not defined in"
- " reporting.yaml" % parameter)
- return value
-
-
def wait_for_execution(client, execution, logger, timeout=7200, ):
"""Wait for a workflow execution on Cloudify Manager."""
# if execution already ended - return without waiting