aboutsummaryrefslogtreecommitdiffstats
path: root/functest/opnfv_tests/vnf
diff options
context:
space:
mode:
Diffstat (limited to 'functest/opnfv_tests/vnf')
-rw-r--r--functest/opnfv_tests/vnf/epc/juju_epc.py704
-rw-r--r--functest/opnfv_tests/vnf/ims/clearwater.py190
-rw-r--r--functest/opnfv_tests/vnf/ims/clearwater_ims_base.py183
-rw-r--r--functest/opnfv_tests/vnf/ims/cloudify_ims.py579
-rw-r--r--functest/opnfv_tests/vnf/ims/cloudify_ims.yaml29
-rw-r--r--functest/opnfv_tests/vnf/ims/heat_ims.py253
-rw-r--r--functest/opnfv_tests/vnf/ims/heat_ims.yaml22
-rw-r--r--functest/opnfv_tests/vnf/router/cloudify_vrouter.py440
-rw-r--r--functest/opnfv_tests/vnf/router/cloudify_vrouter.yaml3
-rw-r--r--functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py20
-rw-r--r--functest/opnfv_tests/vnf/router/utilvnf.py129
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/checker.py2
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/command_generator.py2
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py8
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py28
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py24
-rw-r--r--functest/opnfv_tests/vnf/router/vrouter_base.py34
17 files changed, 1047 insertions, 1603 deletions
diff --git a/functest/opnfv_tests/vnf/epc/juju_epc.py b/functest/opnfv_tests/vnf/epc/juju_epc.py
index 3f2a9ff93..1cf240b80 100644
--- a/functest/opnfv_tests/vnf/epc/juju_epc.py
+++ b/functest/opnfv_tests/vnf/epc/juju_epc.py
@@ -14,35 +14,16 @@ import os
import time
import json
import re
-import subprocess
import sys
-import uuid
+
from copy import deepcopy
import pkg_resources
-import yaml
-
-import six
-from snaps.config.flavor import FlavorConfig
-from snaps.config.image import ImageConfig
-from snaps.config.network import NetworkConfig, SubnetConfig
-from snaps.config.router import RouterConfig
-from snaps.config.security_group import (
- Direction, Protocol, SecurityGroupConfig, SecurityGroupRuleConfig)
-from snaps.config.user import UserConfig
-from snaps.openstack.create_flavor import OpenStackFlavor
-from snaps.openstack.create_image import OpenStackImage
-from snaps.openstack.create_network import OpenStackNetwork
-from snaps.openstack.create_router import OpenStackRouter
-from snaps.openstack.create_security_group import OpenStackSecurityGroup
-from snaps.openstack.create_user import OpenStackUser
-from snaps.openstack.utils import keystone_utils
-from snaps.openstack.utils import nova_utils
-from snaps.openstack.utils import neutron_utils
-
-from functest.core import vnf
-from functest.opnfv_tests.openstack.snaps import snaps_utils
+import scp
+
+from functest.core import singlevm
from functest.utils import config
from functest.utils import env
+from functest.utils import functest_utils
__author__ = "Amarendra Meher <amarendra@rebaca.com>"
__author__ = "Soumaya K Nayek <soumaya.nayek@rebaca.com>"
@@ -61,188 +42,186 @@ CREDS_TEMPLATE2 = """credentials:
default-credential: abot-epc
abot-epc:
auth-type: userpass
- password: {pass}
+ password: '{pass}'
project-domain-name: {project_domain_n}
tenant-name: {tenant_n}"""
-CREDS_TEMPLATE3 = """credentials:
+CREDS_TEMPLATE = """credentials:
abot-epc:
default-credential: abot-epc
abot-epc:
auth-type: userpass
- password: {pass}
+ password: '{pass}'
project-domain-name: {project_domain_n}
tenant-name: {tenant_n}
user-domain-name: {user_domain_n}
username: {user_n}"""
-class JujuEpc(vnf.VnfOnBoarding):
+class JujuEpc(singlevm.SingleVm2):
# pylint:disable=too-many-instance-attributes
"""Abot EPC deployed with JUJU Orchestrator Case"""
__logger = logging.getLogger(__name__)
- juju_timeout = '3600'
+ cidr = '192.168.120.0/24'
+
+ filename = ('/home/opnfv/functest/images/'
+ 'ubuntu-16.04-server-cloudimg-amd64-disk1.img')
+ filename_alt = ('/home/opnfv/functest/images/'
+ 'ubuntu-14.04-server-cloudimg-amd64-disk1.img')
+
+ flavor_ram = 2048
+ flavor_vcpus = 1
+ flavor_disk = 10
+ flavor_alt_ram = 4096
+ flavor_alt_vcpus = 1
+ flavor_alt_disk = 10
+ username = 'ubuntu'
+ juju_timeout = '4800'
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "juju_epc"
- super(JujuEpc, self).__init__(**kwargs)
+ super().__init__(**kwargs)
# Retrieve the configuration
self.case_dir = pkg_resources.resource_filename(
'functest', 'opnfv_tests/vnf/epc')
try:
self.config = getattr(
- config.CONF, 'vnf_{}_config'.format(self.case_name))
- except Exception:
- raise Exception("VNF config file not found")
+ config.CONF, f'vnf_{self.case_name}_config')
+ except Exception as exc:
+ raise Exception("VNF config file not found") from exc
self.config_file = os.path.join(self.case_dir, self.config)
- self.orchestrator = dict(requirements=get_config(
- "orchestrator.requirements", self.config_file))
+ self.orchestrator = dict(
+ requirements=functest_utils.get_parameter_from_yaml(
+ "orchestrator.requirements", self.config_file))
self.created_object = []
self.details['orchestrator'] = dict(
- name=get_config("orchestrator.name", self.config_file),
- version=get_config("orchestrator.version", self.config_file),
+ name=functest_utils.get_parameter_from_yaml(
+ "orchestrator.name", self.config_file),
+ version=functest_utils.get_parameter_from_yaml(
+ "orchestrator.version", self.config_file),
status='ERROR',
result=''
)
self.vnf = dict(
- descriptor=get_config("vnf.descriptor", self.config_file),
- requirements=get_config("vnf.requirements", self.config_file)
+ descriptor=functest_utils.get_parameter_from_yaml(
+ "vnf.descriptor", self.config_file),
+ requirements=functest_utils.get_parameter_from_yaml(
+ "vnf.requirements", self.config_file)
)
self.details['vnf'] = dict(
descriptor_version=self.vnf['descriptor']['version'],
- name=get_config("vnf.name", self.config_file),
- version=get_config("vnf.version", self.config_file),
+ name=functest_utils.get_parameter_from_yaml(
+ "vnf.name", self.config_file),
+ version=functest_utils.get_parameter_from_yaml(
+ "vnf.version", self.config_file),
)
self.__logger.debug("VNF configuration: %s", self.vnf)
self.details['test_vnf'] = dict(
- name=get_config("vnf_test_suite.name", self.config_file),
- version=get_config("vnf_test_suite.version", self.config_file),
- tag_name=get_config("vnf_test_suite.tag_name", self.config_file)
+ name=functest_utils.get_parameter_from_yaml(
+ "vnf_test_suite.name", self.config_file),
+ version=functest_utils.get_parameter_from_yaml(
+ "vnf_test_suite.version", self.config_file),
+ tag_name=functest_utils.get_parameter_from_yaml(
+ "vnf_test_suite.tag_name", self.config_file)
)
- self.public_auth_url = None
self.res_dir = os.path.join(
getattr(config.CONF, 'dir_results'), self.case_name)
- def _bypass_juju_netdiscovery_bug(self, name):
- user_creator = OpenStackUser(
- self.snaps_creds,
- UserConfig(
- name=name,
- password=str(uuid.uuid4()),
- project_name=self.tenant_name,
- domain_name=self.snaps_creds.user_domain_name,
- roles={'_member_': self.tenant_name}))
- user_creator.create()
- self.created_object.append(user_creator)
- return user_creator
+ try:
+ self.public_auth_url = self.get_public_auth_url(self.orig_cloud)
+ if not self.public_auth_url.endswith(('v3', 'v3/')):
+ self.public_auth_url = f"{self.public_auth_url}/v3"
+ except Exception: # pylint: disable=broad-except
+ self.public_auth_url = None
+ self.sec = None
+ self.image_alt = None
+ self.flavor_alt = None
+
+ def _install_juju(self):
+ (_, stdout, stderr) = self.ssh.exec_command(
+ 'sudo snap install juju --channel=2.3/stable --classic')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
+
+ def _install_juju_wait(self):
+ (_, stdout, stderr) = self.ssh.exec_command(
+ 'sudo apt-get update && sudo apt-get install python3-pip -y && '
+ 'sudo pip3 install juju_wait===2.6.4')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
def _register_cloud(self):
+ assert self.public_auth_url
self.__logger.info("Creating Cloud for Abot-epc .....")
clouds_yaml = os.path.join(self.res_dir, "clouds.yaml")
cloud_data = {
'url': self.public_auth_url,
- 'region': self.snaps_creds.region_name if (
- self.snaps_creds.region_name) else 'RegionOne'}
- with open(clouds_yaml, 'w') as yfile:
+ 'region': self.cloud.region_name if self.cloud.region_name else (
+ 'RegionOne')}
+ with open(clouds_yaml, 'w', encoding='utf-8') as yfile:
yfile.write(CLOUD_TEMPLATE.format(**cloud_data))
- cmd = ['juju', 'add-cloud', 'abot-epc', '-f', clouds_yaml, '--replace']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
-
- def _register_credentials_v2(self):
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.put(clouds_yaml, remote_path='~/')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju add-cloud abot-epc -f clouds.yaml --replace')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
+
+ def _register_credentials(self):
self.__logger.info("Creating Credentials for Abot-epc .....")
- user_creator = self._bypass_juju_netdiscovery_bug(
- 'juju_network_discovery_bug')
- snaps_creds = user_creator.get_os_creds(self.snaps_creds.project_name)
- self.__logger.debug("snaps creds: %s", snaps_creds)
credentials_yaml = os.path.join(self.res_dir, "credentials.yaml")
creds_data = {
- 'pass': snaps_creds.password,
- 'tenant_n': snaps_creds.project_name,
- 'user_n': snaps_creds.username}
- with open(credentials_yaml, 'w') as yfile:
- yfile.write(CREDS_TEMPLATE2.format(**creds_data))
- cmd = ['juju', 'add-credential', 'abot-epc', '-f', credentials_yaml,
- '--replace']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
-
- def _register_credentials_v3(self):
- self.__logger.info("Creating Credentials for Abot-epc .....")
- user_creator = self._bypass_juju_netdiscovery_bug(
- 'juju_network_discovery_bug')
- snaps_creds = user_creator.get_os_creds(self.snaps_creds.project_name)
- self.__logger.debug("snaps creds: %s", snaps_creds)
- credentials_yaml = os.path.join(self.res_dir, "credentials.yaml")
- creds_data = {
- 'pass': snaps_creds.password,
- 'tenant_n': snaps_creds.project_name,
- 'user_n': snaps_creds.username,
- 'project_domain_n': snaps_creds.project_domain_name,
- 'user_domain_n': snaps_creds.user_domain_name}
- with open(credentials_yaml, 'w') as yfile:
- yfile.write(CREDS_TEMPLATE3.format(**creds_data))
- cmd = ['juju', 'add-credential', 'abot-epc', '-f', credentials_yaml,
- '--replace']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
-
- def _add_custom_rule(self, sec_grp_name):
- """ To add custom rule for SCTP Traffic """
-
- security_group = OpenStackSecurityGroup(
- self.snaps_creds,
- SecurityGroupConfig(
- name=sec_grp_name))
-
- security_group.create()
-
- # Add custom security rule to the obtained Security Group
- self.__logger.info("Adding SCTP ingress rule to SG:%s",
- security_group.sec_grp_settings.name)
-
- try:
- security_group.add_rule(SecurityGroupRuleConfig(
- sec_grp_name=sec_grp_name, direction=Direction.ingress,
- protocol=Protocol.sctp))
- except Exception: # pylint: disable=broad-except
- self.__logger.exception(
- "Some issue encountered with adding SCTP security rule ...")
-
- def prepare(self):
- """Prepare testcase (Additional pre-configuration steps)."""
- self.__logger.info("Additional pre-configuration steps")
- super(JujuEpc, self).prepare()
- try:
- os.makedirs(self.res_dir)
- except OSError as ex:
- if ex.errno != errno.EEXIST:
- self.__logger.exception("Cannot create %s", self.res_dir)
- raise vnf.VnfPreparationException
-
- self.__logger.info("ENV:\n%s", env.string())
-
- self.public_auth_url = keystone_utils.get_endpoint(
- self.snaps_creds, 'identity')
-
- # it enforces a versioned public identity endpoint as juju simply
- # adds /auth/tokens wich fails vs an unversioned endpoint.
- if not self.public_auth_url.endswith(('v3', 'v3/', 'v2.0', 'v2.0/')):
- self.public_auth_url = six.moves.urllib.parse.urljoin(
- self.public_auth_url, 'v3')
- self._register_cloud()
- if self.snaps_creds.identity_api_version == 3:
- self._register_credentials_v3()
- else:
- self._register_credentials_v2()
+ 'pass': self.project.password,
+ 'tenant_n': self.project.project.name,
+ 'user_n': self.project.user.name,
+ 'project_domain_n': self.cloud.auth.get(
+ "project_domain_name", "Default"),
+ 'user_domain_n': self.cloud.auth.get(
+ "user_domain_name", "Default")}
+ with open(credentials_yaml, 'w', encoding='utf-8') as yfile:
+ yfile.write(CREDS_TEMPLATE.format(**creds_data))
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.put(credentials_yaml, remote_path='~/')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju add-credential abot-epc -f credentials.yaml '
+ ' --replace --debug')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
+
+ def _publish_image(self):
+ region_name = self.cloud.region_name if self.cloud.region_name else (
+ 'RegionOne')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju metadata generate-image -d /home/ubuntu '
+ f'-i {self.image.id} -s xenial -r {region_name} '
+ f'-u {self.public_auth_url}')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
+
+ def publish_image_alt(self, name=None):
+ image_alt = super().publish_image_alt(name)
+ region_name = self.cloud.region_name if self.cloud.region_name else (
+ 'RegionOne')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju metadata generate-image -d /home/ubuntu '
+ f'-i {image_alt.id} -s trusty -r {region_name} '
+ f'-u {self.public_auth_url}')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return image_alt
def deploy_orchestrator(self): # pylint: disable=too-many-locals
"""
@@ -250,204 +229,128 @@ class JujuEpc(vnf.VnfOnBoarding):
Bootstrap juju
"""
- self.__logger.info("Deploying Juju Orchestrator")
- private_net_name = getattr(
- config.CONF, 'vnf_{}_private_net_name'.format(self.case_name))
- private_subnet_name = '{}-{}'.format(
- getattr(config.CONF,
- 'vnf_{}_private_subnet_name'.format(self.case_name)),
- self.uuid)
- private_subnet_cidr = getattr(
- config.CONF, 'vnf_{}_private_subnet_cidr'.format(self.case_name))
- abot_router = '{}-{}'.format(
- getattr(config.CONF,
- 'vnf_{}_external_router'.format(self.case_name)),
- self.uuid)
- self.__logger.info("Creating full network with nameserver: %s",
- env.get('NAMESERVER'))
- subnet_settings = SubnetConfig(
- name=private_subnet_name,
- cidr=private_subnet_cidr,
- dns_nameservers=[env.get('NAMESERVER')])
- network_settings = NetworkConfig(
- name=private_net_name, subnet_settings=[subnet_settings])
- network_creator = OpenStackNetwork(self.snaps_creds, network_settings)
- net_id = network_creator.create().id
- self.created_object.append(network_creator)
-
- ext_net_name = snaps_utils.get_ext_net_name(self.snaps_creds)
- self.__logger.info("Creating network Router ....")
- router_creator = OpenStackRouter(
- self.snaps_creds, RouterConfig(
- name=abot_router,
- external_gateway=ext_net_name,
- internal_subnets=[subnet_settings.name]))
- router_creator.create()
- self.created_object.append(router_creator)
- self.__logger.info("Creating Flavor ....")
- flavor_settings = FlavorConfig(
- name=self.orchestrator['requirements']['flavor']['name'],
- ram=self.orchestrator['requirements']['flavor']['ram_min'],
- disk=10, vcpus=1)
- flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings)
- flavor_creator.create()
- self.created_object.append(flavor_creator)
-
- self.__logger.info("Upload some OS images if it doesn't exist")
- images = get_config("tenant_images", self.config_file)
- self.__logger.info("Images needed for vEPC: %s", images)
- for image_name, image_file in six.iteritems(images):
- self.__logger.info("image: %s, file: %s", image_name, image_file)
- if image_file and image_name:
- image_creator = OpenStackImage(self.snaps_creds, ImageConfig(
- name=image_name, image_user='cloud', img_format='qcow2',
- image_file=image_file))
- image_id = image_creator.create().id
- cmd = ['juju', 'metadata', 'generate-image', '-d', '/root',
- '-i', image_id, '-s', image_name, '-r',
- self.snaps_creds.region_name if (
- self.snaps_creds.region_name) else 'RegionOne',
- '-u', self.public_auth_url]
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
- self.created_object.append(image_creator)
- self.__logger.info("Network ID : %s", net_id)
-
+ self._publish_image()
+ self.image_alt = self.publish_image_alt()
+ self.flavor_alt = self.create_flavor_alt()
self.__logger.info("Starting Juju Bootstrap process...")
- try:
- cmd = ['timeout', '-t', JujuEpc.juju_timeout,
- 'juju', 'bootstrap', 'abot-epc', 'abot-controller',
- '--metadata-source', '/root',
- '--constraints', 'mem=2G',
- '--bootstrap-series', 'xenial',
- '--config', 'network={}'.format(net_id),
- '--config', 'ssl-hostname-verification=false',
- '--config', 'use-floating-ip=true',
- '--config', 'use-default-secgroup=true',
- '--debug']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
- except subprocess.CalledProcessError as cpe:
- self.__logger.error(
- "Exception with Juju Bootstrap: %s\n%s",
- cpe.cmd, cpe.output)
- return False
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Some issue with Juju Bootstrap ...")
- return False
-
- return True
+ region_name = self.cloud.region_name if self.cloud.region_name else (
+ 'RegionOne')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ f'timeout {JujuEpc.juju_timeout} '
+ f'/snap/bin/juju bootstrap abot-epc/{region_name} abot-controller '
+ '--agent-version 2.3.9 --metadata-source /home/ubuntu '
+ '--constraints mem=2G --bootstrap-series xenial '
+ f'--config network={self.network.id} '
+ '--config ssl-hostname-verification=false '
+ f'--config external-network={self.ext_net.id} '
+ '--config use-floating-ip=true '
+ '--config use-default-secgroup=true '
+ '--debug')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
def check_app(self, name='abot-epc-basic', status='active'):
"""Check application status."""
- cmd = ['juju', 'status', '--format', 'short', name]
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
- ret = re.search(r'(?=workload:({})\))'.format(status), output)
- if ret:
- self.__logger.info("%s workload is %s", name, status)
- return True
- self.__logger.error("%s workload differs from %s", name, status)
- return False
+ for i in range(10):
+ (_, stdout, stderr) = self.ssh.exec_command(
+ f'/snap/bin/juju status --format short {name}')
+ output = stdout.read().decode("utf-8")
+ self.__logger.debug("stdout:\n%s", output)
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ continue
+ ret = re.search(
+ rf'(?=workload:({status})\))', output)
+ if ret:
+ self.__logger.info("%s workload is %s", name, status)
+ break
+ self.__logger.info(
+ "loop %d: %s workload differs from %s", i + 1, name, status)
+ time.sleep(60)
+ else:
+ self.__logger.error("%s workload differs from %s", name, status)
+ return False
+ return True
def deploy_vnf(self):
"""Deploy ABOT-OAI-EPC."""
self.__logger.info("Upload VNFD")
- descriptor = self.vnf['descriptor']
- self.__logger.info("Get or create flavor for all Abot-EPC")
- flavor_settings = FlavorConfig(
- name=self.vnf['requirements']['flavor']['name'],
- ram=self.vnf['requirements']['flavor']['ram_min'],
- disk=10,
- vcpus=1)
- flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings)
- flavor_creator.create()
- self.created_object.append(flavor_creator)
-
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.put(
+ '/src/epc-requirements/abot_charm', remote_path='~/',
+ recursive=True)
self.__logger.info("Deploying Abot-epc bundle file ...")
- cmd = ['juju', 'deploy', '{}'.format(descriptor.get('file_name'))]
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
- self.__logger.info("Waiting for instances .....")
- try:
- cmd = ['timeout', '-t', JujuEpc.juju_timeout, 'juju-wait']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
- self.__logger.info("Deployed Abot-epc on Openstack")
- except subprocess.CalledProcessError as cpe:
- self.__logger.error(
- "Exception with Juju VNF Deployment: %s\n%s",
- cpe.cmd, cpe.output)
- return False
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Some issue with the VNF Deployment ..")
- return False
-
+ (_, stdout, stderr) = self.ssh.exec_command(
+ 'sudo mkdir -p /src/epc-requirements && '
+ 'sudo mv abot_charm /src/epc-requirements/abot_charm && '
+ '/snap/bin/juju deploy '
+ '/src/epc-requirements/abot_charm/functest-abot-epc-bundle/'
+ 'bundle.yaml')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
+ (_, stdout, stderr) = self.ssh.exec_command(
+ 'PATH=/snap/bin/:$PATH '
+ f'timeout {JujuEpc.juju_timeout} juju-wait')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
self.__logger.info("Checking status of ABot and EPC units ...")
- cmd = ['juju', 'status']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.debug("%s\n%s", " ".join(cmd), output)
+ (_, stdout, stderr) = self.ssh.exec_command('/snap/bin/juju status')
+ output = stdout.read().decode("utf-8")
+ self.__logger.debug("stdout:\n%s", output)
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
for app in ['abot-epc-basic', 'oai-epc', 'oai-hss']:
if not self.check_app(app):
return False
-
- nova_client = nova_utils.nova_client(self.snaps_creds)
- instances = get_instances(nova_client)
- self.__logger.info("List of Instance: %s", instances)
- for items in instances:
- metadata = get_instance_metadata(nova_client, items)
- if 'juju-units-deployed' in metadata:
- sec_group = 'juju-{}-{}'.format(
- metadata['juju-controller-uuid'],
- metadata['juju-model-uuid'])
- self.__logger.info("Instance: %s", sec_group)
- break
- self.__logger.info("Adding Security group rule....")
- # This will add sctp rule to a common Security Group Created
- # by juju and shared to all deployed units.
- self._add_custom_rule(sec_group)
-
- self.__logger.info("Transferring the feature files to Abot_node ...")
- cmd = ['timeout', '-t', JujuEpc.juju_timeout,
- 'juju', 'scp', '--', '-r', '-v',
- '{}/featureFiles'.format(self.case_dir), 'abot-epc-basic/0:~/']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
-
- self.__logger.info("Copying the feature files within Abot_node ")
- cmd = ['timeout', '-t', JujuEpc.juju_timeout,
- 'juju', 'ssh', 'abot-epc-basic/0',
- 'sudo', 'cp', '-vfR', '~/featureFiles/*',
- '/etc/rebaca-test-suite/featureFiles']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
- return True
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.put(
+ f'{self.case_dir}/featureFiles', remote_path='~/',
+ recursive=True)
+ (_, stdout, stderr) = self.ssh.exec_command(
+ f'timeout {JujuEpc.juju_timeout} /snap/bin/juju scp -- -r -v '
+ '~/featureFiles abot-epc-basic/0:/etc/rebaca-test-suite/')
+ output = stdout.read().decode("utf-8")
+ self.__logger.debug("stdout:\n%s", output)
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
def test_vnf(self):
"""Run test on ABoT."""
start_time = time.time()
- self.__logger.info("Running VNF Test cases....")
- cmd = ['juju', 'run-action', 'abot-epc-basic/0', 'run',
- 'tagnames={}'.format(self.details['test_vnf']['tag_name'])]
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
-
- cmd = ['timeout', '-t', JujuEpc.juju_timeout, 'juju-wait']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
-
+ (_, stdout, stderr) = self.ssh.exec_command(
+ "/snap/bin/juju run-action abot-epc-basic/0 "
+ f"run tagnames={self.details['test_vnf']['tag_name']}")
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
+ (_, stdout, stderr) = self.ssh.exec_command(
+ 'PATH=/snap/bin/:$PATH '
+ f'timeout {JujuEpc.juju_timeout} juju-wait')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
duration = time.time() - start_time
self.__logger.info("Getting results from Abot node....")
- cmd = ['timeout', '-t', JujuEpc.juju_timeout,
- 'juju', 'scp', '--', '-v',
- 'abot-epc-basic/0:'
- '/var/lib/abot-epc-basic/artifacts/TestResults.json',
- '{}/.'.format(self.res_dir)]
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
+ (_, stdout, stderr) = self.ssh.exec_command(
+ f'timeout {JujuEpc.juju_timeout} /snap/bin/juju scp '
+ '-- -v abot-epc-basic/0:'
+ '/var/lib/abot-epc-basic/artifacts/TestResults.json .')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.get('TestResults.json', self.res_dir)
self.__logger.info("Parsing the Test results...")
- res = (process_abot_test_result('{}/TestResults.json'.format(
- self.res_dir)))
+ res = process_abot_test_result(f'{self.res_dir}/TestResults.json')
short_result = sig_test_format(res)
self.__logger.info(short_result)
self.details['test_vnf'].update(
@@ -459,99 +362,48 @@ class JujuEpc(vnf.VnfOnBoarding):
short_result['failures'], short_result['skipped'])
return True
- def _get_floating_ips(self):
- """Get the list of floating IPs associated with the current project"""
-
- project_id = self.os_project.get_project().id
-
- neutron_client = neutron_utils.neutron_client(self.snaps_creds)
- floating_ips = neutron_utils.get_floating_ips(neutron_client)
-
- project_floating_ip_list = list()
- for floating_ip in floating_ips:
- if project_id and project_id == floating_ip.project_id:
- project_floating_ip_list.append(floating_ip)
-
- return project_floating_ip_list
-
- def _release_floating_ips(self, fip_list):
- """
- Responsible for deleting a list of floating IPs
- :param fip_list: A list of SNAPS FloatingIp objects
- :return:
- """
- if not fip_list:
- return
-
- neutron_client = neutron_utils.neutron_client(self.snaps_creds)
-
- for floating_ip in fip_list:
- neutron_utils.delete_floating_ip(neutron_client, floating_ip)
-
- def clean(self):
- """Clean created objects/functions."""
-
- # Store Floating IPs of instances created by Juju
- fip_list = self._get_floating_ips()
- self.__logger.info("Floating IPs assigned to project:%s",
- self.os_project.get_project().name)
- for floating_ip in fip_list:
- self.__logger.debug("%s:%s", floating_ip.ip,
- floating_ip.description)
-
+ def execute(self):
+ """Prepare testcase (Additional pre-configuration steps)."""
+ assert self.public_auth_url
+ self.__logger.info("Additional pre-configuration steps")
+ try:
+ os.makedirs(self.res_dir)
+ except OSError as ex:
+ if ex.errno != errno.EEXIST:
+ self.__logger.exception("Cannot create %s", self.res_dir)
+ raise Exception from ex
+ self.__logger.info("ENV:\n%s", env.string())
try:
- cmd = ['juju', 'debug-log', '--replay', '--no-tail']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.debug("%s\n%s", " ".join(cmd), output)
- if not self.orchestrator['requirements']['preserve_setup']:
- self.__logger.info("Destroying Orchestrator...")
- cmd = ['timeout', '-t', JujuEpc.juju_timeout,
- 'juju', 'destroy-controller', '-y', 'abot-controller',
- '--destroy-all-models']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
- except subprocess.CalledProcessError as cpe:
- self.__logger.error(
- "Exception with Juju Cleanup: %s\n%s",
- cpe.cmd, cpe.output)
+ assert self._install_juju()
+ assert self._install_juju_wait()
+ assert self._register_cloud()
+ assert self._register_credentials()
+ assert self.deploy_orchestrator()
+ assert self.deploy_vnf()
+ assert self.test_vnf()
except Exception: # pylint: disable=broad-except
- self.__logger.exception("General issue during the undeployment ..")
-
- if not self.orchestrator['requirements']['preserve_setup']:
- try:
- self.__logger.info('Release floating IPs assigned by Juju...')
- self._release_floating_ips(fip_list)
- except Exception: # pylint: disable=broad-except
- self.__logger.exception(
- "Exception while releasing floating IPs ...")
+ self.__logger.exception("juju_epc failed")
+ return 1
+ return 0
- self.__logger.info('Remove the Abot_epc OS objects ..')
- super(JujuEpc, self).clean()
-
- return True
-
-
-# ----------------------------------------------------------
-#
-# YAML UTILS
-#
-# -----------------------------------------------------------
-def get_config(parameter, file_path):
- """
- Returns the value of a given parameter in file.yaml
- parameter must be given in string format with dots
- Example: general.openstack.image_name
- """
- with open(file_path) as config_file:
- file_yaml = yaml.safe_load(config_file)
- config_file.close()
- value = file_yaml
- for element in parameter.split("."):
- value = value.get(element)
- if value is None:
- raise ValueError("The parameter %s is not defined in"
- " reporting.yaml" % parameter)
- return value
+ def clean(self):
+ """Clean created objects/functions."""
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju debug-log --replay --no-tail')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju destroy-controller -y abot-controller '
+ '--destroy-all-models')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ for fip in self.cloud.list_floating_ips():
+ self.cloud.delete_floating_ip(fip.id)
+ if self.image_alt:
+ self.cloud.delete_image(self.image_alt)
+ if self.flavor_alt:
+ self.orig_cloud.delete_flavor(self.flavor_alt.id)
+ super().clean()
def sig_test_format(sig_test):
@@ -577,7 +429,7 @@ def sig_test_format(sig_test):
def process_abot_test_result(file_path):
""" Process ABoT Result """
- with open(file_path) as test_result:
+ with open(file_path, encoding='utf-8') as test_result:
data = json.load(test_result)
res = []
for tests in data:
@@ -629,23 +481,3 @@ def update_data(obj):
raise
return obj
-
-
-def get_instances(nova_client):
- """ To get all vm info of a project """
- try:
- instances = nova_client.servers.list()
- return instances
- except Exception as exc: # pylint: disable=broad-except
- logging.error("Error [get_instances(nova_client)]: %s", exc)
- return None
-
-
-def get_instance_metadata(nova_client, instance):
- """ Get instance Metadata - Instance ID """
- try:
- instance = nova_client.servers.get(instance.id)
- return instance.metadata
- except Exception as exc: # pylint: disable=broad-except
- logging.error("Error [get_instance_status(nova_client)]: %s", exc)
- return None
diff --git a/functest/opnfv_tests/vnf/ims/clearwater.py b/functest/opnfv_tests/vnf/ims/clearwater.py
new file mode 100644
index 000000000..4c143fd70
--- /dev/null
+++ b/functest/opnfv_tests/vnf/ims/clearwater.py
@@ -0,0 +1,190 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2017 All rights reserved
+# This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Ease testing any Clearwater deployment"""
+
+import logging
+import os
+import re
+import time
+
+import pkg_resources
+import requests
+
+from functest.utils import config
+import functest.utils.functest_utils as ft_utils
+
+__author__ = ("Valentin Boucher <valentin.boucher@orange.com>, "
+ "Helen Yao <helanyao@gmail.com>")
+
+
+class ClearwaterTesting():
+ """vIMS clearwater base usable by several orchestrators"""
+
+ def __init__(self, case_name, bono_ip, ellis_ip):
+ self.logger = logging.getLogger(__name__)
+ self.case_dir = pkg_resources.resource_filename(
+ 'functest', 'opnfv_tests/vnf/ims')
+ self.data_dir = getattr(config.CONF, 'dir_ims_data')
+ self.result_dir = os.path.join(
+ getattr(config.CONF, 'dir_results'), case_name)
+ self.test_dir = getattr(config.CONF, 'dir_repo_vims_test')
+
+ if not os.path.exists(self.data_dir):
+ os.makedirs(self.data_dir)
+ if not os.path.exists(self.result_dir):
+ os.makedirs(self.result_dir)
+
+ self.ellis_ip = ellis_ip
+ self.bono_ip = bono_ip
+
+ def availability_check(self, signup_code='secret', two_numbers=False):
+ """Create one or two numbers"""
+ assert self.ellis_ip
+ output_dict = {}
+ self.logger.debug('Ellis IP: %s', self.ellis_ip)
+ output_dict['ellis_ip'] = self.ellis_ip
+ account_url = f'http://{self.ellis_ip}/accounts'
+ params = {"password": "functest",
+ "full_name": "opnfv functest user",
+ "email": "functest@opnfv.org",
+ "signup_code": signup_code}
+ output_dict['login'] = params
+
+ number_res = self._create_ellis_account(account_url, params)
+ output_dict['number'] = number_res
+
+ session_url = f'http://{self.ellis_ip}/session'
+ session_data = {
+ 'username': params['email'],
+ 'password': params['password'],
+ 'email': params['email']
+ }
+ cookies = self._get_ellis_session_cookies(session_url, session_data)
+
+ number_url = (
+ f"http://{self.ellis_ip}/accounts/{params['email']}/numbers")
+ self.logger.debug('Create 1st calling number on Ellis')
+ number_res = self._create_ellis_number(number_url, cookies)
+
+ if two_numbers:
+ self.logger.debug('Create 2nd calling number on Ellis')
+ number_res = self._create_ellis_number(number_url, cookies)
+ output_dict['number2'] = number_res
+
+ return output_dict
+
+ def _create_ellis_account(self, account_url, params):
+ i = 80
+ for iloop in range(i):
+ try:
+ req = requests.post(account_url, data=params)
+ if req.status_code == 201:
+ account_res = req.json()
+ self.logger.info(
+ 'Account %s is created on Ellis\n%s',
+ params.get('full_name'), account_res)
+ return account_res
+ raise Exception("Cannot create ellis account")
+ except Exception: # pylint: disable=broad-except
+ self.logger.info(
+ "try %s: cannot create ellis account", iloop + 1)
+ time.sleep(30)
+ raise Exception(
+ f"Unable to create an account {params.get('full_name')}")
+
+ def _get_ellis_session_cookies(self, session_url, params):
+ i = 15
+ for iloop in range(i):
+ try:
+ req = requests.post(session_url, data=params)
+ if req.status_code == 201:
+ cookies = req.cookies
+ self.logger.debug('cookies: %s', cookies)
+ return cookies
+ raise Exception('Failed to get cookies for Ellis')
+ except Exception: # pylint: disable=broad-except
+ self.logger.info(
+ "try %s: cannot get cookies for Ellis", iloop + 1)
+ time.sleep(10)
+ raise Exception('Failed to get cookies for Ellis')
+
+ def _create_ellis_number(self, number_url, cookies):
+ i = 30
+ for iloop in range(i):
+ try:
+ req = requests.post(number_url, cookies=cookies)
+ if req.status_code == 200:
+ number_res = req.json()
+ self.logger.info(
+ 'Calling number is created: %s', number_res)
+ return number_res
+ if req and req.json():
+ reason = req.json()['reason']
+ else:
+ reason = req
+ self.logger.info("cannot create a number: %s", reason)
+ raise Exception('Failed to create a number')
+ except Exception: # pylint: disable=broad-except
+ self.logger.info(
+ "try %s: cannot create a number", iloop + 1)
+ time.sleep(25)
+ raise Exception('Failed to create a number')
+
+ def run_clearwater_live_test(self, public_domain, signup_code='secret'):
+ """Run the Clearwater live tests
+
+ It first runs dnsmasq to reach clearwater services by FQDN and then the
+ Clearwater live tests. All results are saved in ims_test_output.txt.
+
+ Returns:
+ - a dict containing the overall results
+ - None on error
+ """
+ # pylint: disable=too-many-locals,too-many-arguments
+ self.logger.info('Run Clearwater live test')
+ script = (f'cd {self.test_dir};'
+ f'rake test[{public_domain}] SIGNUP_CODE={signup_code}')
+ if self.bono_ip and self.ellis_ip:
+ subscript = f' PROXY={self.bono_ip} ELLIS={self.ellis_ip}'
+ script = f'{script}{subscript}'
+ script = f'{script} --trace'
+ cmd = f"/bin/sh -c '{script}'"
+ self.logger.debug('Live test cmd: %s', cmd)
+ output_file = os.path.join(self.result_dir, "ims_test_output.txt")
+ ft_utils.execute_command(cmd,
+ error_msg='Clearwater live test failed',
+ output_file=output_file)
+
+ with open(output_file, 'r', encoding='utf-8') as ofile:
+ result = ofile.read()
+
+ if result != "":
+ self.logger.debug(result)
+
+ vims_test_result = {}
+ try:
+ grp = re.search(
+ r'^(\d+) failures out of (\d+) tests run.*\n'
+ r'(\d+) tests skipped$', result, re.MULTILINE | re.DOTALL)
+ assert grp
+ vims_test_result["failures"] = int(grp.group(1))
+ vims_test_result["total"] = int(grp.group(2))
+ vims_test_result["skipped"] = int(grp.group(3))
+ vims_test_result['passed'] = (
+ int(grp.group(2)) - int(grp.group(3)) - int(grp.group(1)))
+ if vims_test_result['total'] - vims_test_result['skipped'] > 0:
+ vnf_test_rate = vims_test_result['passed'] / (
+ vims_test_result['total'] - vims_test_result['skipped'])
+ else:
+ vnf_test_rate = 0
+ except Exception: # pylint: disable=broad-except
+ self.logger.exception("Cannot parse live tests results")
+ return None, 0
+ return vims_test_result, vnf_test_rate
diff --git a/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py b/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py
deleted file mode 100644
index add99468b..000000000
--- a/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py
+++ /dev/null
@@ -1,183 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (c) 2017 All rights reserved
-# This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Ease testing any Clearwater deployment"""
-
-import logging
-import os
-import re
-import shlex
-import shutil
-import subprocess
-import time
-
-import pkg_resources
-import requests
-
-import functest.core.vnf as vnf
-from functest.utils import config
-import functest.utils.functest_utils as ft_utils
-
-__author__ = ("Valentin Boucher <valentin.boucher@orange.com>, "
- "Helen Yao <helanyao@gmail.com>")
-
-
-class ClearwaterOnBoardingBase(vnf.VnfOnBoarding):
- """vIMS clearwater base usable by several orchestrators"""
-
- def __init__(self, **kwargs):
- self.logger = logging.getLogger(__name__)
- super(ClearwaterOnBoardingBase, self).__init__(**kwargs)
- self.case_dir = pkg_resources.resource_filename(
- 'functest', 'opnfv_tests/vnf/ims')
- self.data_dir = getattr(config.CONF, 'dir_ims_data')
- self.result_dir = os.path.join(getattr(config.CONF, 'dir_results'),
- self.case_name)
- self.test_dir = getattr(config.CONF, 'dir_repo_vims_test')
-
- if not os.path.exists(self.data_dir):
- os.makedirs(self.data_dir)
- if not os.path.exists(self.result_dir):
- os.makedirs(self.result_dir)
-
- def config_ellis(self, ellis_ip, signup_code='secret', two_numbers=False):
- """Create one or two numbers"""
- output_dict = {}
- self.logger.debug('Configure Ellis: %s', ellis_ip)
- output_dict['ellis_ip'] = ellis_ip
- account_url = 'http://{0}/accounts'.format(ellis_ip)
- params = {"password": "functest",
- "full_name": "opnfv functest user",
- "email": "functest@opnfv.org",
- "signup_code": signup_code}
- req = requests.post(account_url, data=params)
- output_dict['login'] = params
- if req.status_code != 201 and req.status_code != 409:
- raise Exception(
- "Unable to create an account {}\n{}".format(
- params, req.text))
- self.logger.debug(
- 'Account %s is created on Ellis\n%s', params, req.json())
-
- session_url = 'http://{0}/session'.format(ellis_ip)
- session_data = {
- 'username': params['email'],
- 'password': params['password'],
- 'email': params['email']
- }
- req = requests.post(session_url, data=session_data)
- if req.status_code != 201:
- raise Exception('Failed to get cookie for Ellis\n{}'.format(
- req.text))
- cookies = req.cookies
- self.logger.debug('Cookies: %s', cookies)
-
- number_url = 'http://{0}/accounts/{1}/numbers'.format(
- ellis_ip, params['email'])
- self.logger.debug('Create 1st calling number on Ellis')
- i = 30
- while req.status_code != 200 and i > 0:
- try:
- number_res = self._create_ellis_number(number_url, cookies)
- break
- except Exception: # pylint: disable=broad-except
- if i == 1:
- self.logger.exception("Unable to create a number")
- raise Exception("Unable to create a number")
- self.logger.info("Unable to create a number. Retry ..")
- time.sleep(25)
- i = i - 1
- output_dict['number'] = number_res
-
- if two_numbers:
- self.logger.debug('Create 2nd calling number on Ellis')
- number_res = self._create_ellis_number(number_url, cookies)
- output_dict['number2'] = number_res
-
- return output_dict
-
- def _create_ellis_number(self, number_url, cookies):
- req = requests.post(number_url, cookies=cookies)
-
- if req.status_code != 200:
- if req and req.json():
- reason = req.json()['reason']
- else:
- reason = req
- raise Exception("Unable to create a number: %s" % reason)
- number_res = req.json()
- self.logger.info('Calling number is created: %s', number_res)
- return number_res
-
- def run_clearwater_live_test(self, dns_ip, public_domain,
- bono_ip=None, ellis_ip=None,
- signup_code='secret'):
- """Run the Clearwater live tests
-
- It first runs dnsmasq to reach clearwater services by FQDN and then the
- Clearwater live tests. All results are saved in ims_test_output.txt.
-
- Returns:
- - a dict containing the overall results
- - None on error
- """
- # pylint: disable=too-many-locals,too-many-arguments
- self.logger.info('Run Clearwater live test')
- dns_file = '/etc/resolv.conf'
- dns_file_bak = '/etc/resolv.conf.bak'
- self.logger.debug('Backup %s -> %s', dns_file, dns_file_bak)
- shutil.copy(dns_file, dns_file_bak)
- cmd = ("dnsmasq -d -u root --server=/clearwater.opnfv/{0} "
- "-r /etc/resolv.conf.bak".format(dns_ip))
- dnsmasq_process = subprocess.Popen(shlex.split(cmd))
- script = ('echo -e "nameserver {0}" > {1};'
- 'cd {2};'
- 'rake test[{3}] SIGNUP_CODE={4}'
- .format('127.0.0.1',
- dns_file,
- self.test_dir,
- public_domain,
- signup_code))
- if bono_ip and ellis_ip:
- subscript = ' PROXY={0} ELLIS={1}'.format(bono_ip, ellis_ip)
- script = '{0}{1}'.format(script, subscript)
- script = ('{0}{1}'.format(script, ' --trace'))
- cmd = "/bin/bash -c '{0}'".format(script)
- self.logger.debug('Live test cmd: %s', cmd)
- output_file = os.path.join(self.result_dir, "ims_test_output.txt")
- ft_utils.execute_command(cmd,
- error_msg='Clearwater live test failed',
- output_file=output_file)
- dnsmasq_process.kill()
- with open(dns_file_bak, 'r') as bak_file:
- result = bak_file.read()
- with open(dns_file, 'w') as dfile:
- dfile.write(result)
-
- with open(output_file, 'r') as ofile:
- result = ofile.read()
-
- if result != "":
- self.logger.debug(result)
-
- vims_test_result = {}
- try:
- grp = re.search(
- r'(\d+) failures out of (\d+) tests run.*'
- r'(\d+) tests skipped', result, re.MULTILINE | re.DOTALL)
- assert grp
- vims_test_result["failures"] = int(grp.group(1))
- vims_test_result["total"] = int(grp.group(2))
- vims_test_result["skipped"] = int(grp.group(3))
- vims_test_result['passed'] = (
- int(grp.group(2)) - int(grp.group(3)) - int(grp.group(1)))
- except Exception: # pylint: disable=broad-except
- self.logger.exception("Cannot parse live tests results")
- return None
- return vims_test_result
diff --git a/functest/opnfv_tests/vnf/ims/cloudify_ims.py b/functest/opnfv_tests/vnf/ims/cloudify_ims.py
index 786c535ed..b93af7d6d 100644
--- a/functest/opnfv_tests/vnf/ims/cloudify_ims.py
+++ b/functest/opnfv_tests/vnf/ims/cloudify_ims.py
@@ -14,532 +14,249 @@ from __future__ import division
import logging
import os
import time
-import uuid
-from cloudify_rest_client import CloudifyClient
-from cloudify_rest_client.executions import Execution
-from scp import SCPClient
+import pkg_resources
import six
-from snaps.config.flavor import FlavorConfig
-from snaps.config.image import ImageConfig
-from snaps.config.keypair import KeypairConfig
-from snaps.config.network import NetworkConfig, PortConfig, SubnetConfig
-from snaps.config.router import RouterConfig
-from snaps.config.security_group import (
- Direction, Protocol, SecurityGroupConfig, SecurityGroupRuleConfig)
-from snaps.config.user import UserConfig
-from snaps.config.vm_inst import FloatingIpConfig, VmInstanceConfig
-from snaps.openstack.create_flavor import OpenStackFlavor
-from snaps.openstack.create_image import OpenStackImage
-from snaps.openstack.create_instance import OpenStackVmInstance
-from snaps.openstack.create_keypairs import OpenStackKeypair
-from snaps.openstack.create_network import OpenStackNetwork
-from snaps.openstack.create_router import OpenStackRouter
-from snaps.openstack.create_security_group import OpenStackSecurityGroup
-from snaps.openstack.create_user import OpenStackUser
-from snaps.openstack.utils import keystone_utils
-from xtesting.energy import energy
-import yaml
-
-from functest.opnfv_tests.openstack.snaps import snaps_utils
-from functest.opnfv_tests.vnf.ims import clearwater_ims_base
+
+from functest.core import cloudify
+from functest.opnfv_tests.vnf.ims import clearwater
from functest.utils import config
from functest.utils import env
+from functest.utils import functest_utils
__author__ = "Valentin Boucher <valentin.boucher@orange.com>"
-class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase):
+class CloudifyIms(cloudify.Cloudify):
"""Clearwater vIMS deployed with Cloudify Orchestrator Case."""
__logger = logging.getLogger(__name__)
+ filename_alt = ('/home/opnfv/functest/images/'
+ 'ubuntu-14.04-server-cloudimg-amd64-disk1.img')
+
+ flavor_alt_ram = 1024
+ flavor_alt_vcpus = 1
+ flavor_alt_disk = 3
+
+ quota_security_group = 20
+ quota_security_group_rule = 100
+ quota_port = 50
+
+ cop_yaml = ("https://github.com/cloudify-cosmo/cloudify-openstack-plugin/"
+ "releases/download/2.14.7/plugin.yaml")
+ cop_wgn = ("https://github.com/cloudify-cosmo/cloudify-openstack-plugin/"
+ "releases/download/2.14.7/cloudify_openstack_plugin-2.14.7-py27"
+ "-none-linux_x86_64-centos-Core.wgn")
+
def __init__(self, **kwargs):
"""Initialize CloudifyIms testcase object."""
if "case_name" not in kwargs:
kwargs["case_name"] = "cloudify_ims"
- super(CloudifyIms, self).__init__(**kwargs)
+ super().__init__(**kwargs)
# Retrieve the configuration
try:
self.config = getattr(
- config.CONF, 'vnf_{}_config'.format(self.case_name))
- except Exception:
- raise Exception("VNF config file not found")
+ config.CONF, f'vnf_{self.case_name}_config')
+ except Exception as exc:
+ raise Exception("VNF config file not found") from exc
+ self.case_dir = pkg_resources.resource_filename(
+ 'functest', 'opnfv_tests/vnf/ims')
config_file = os.path.join(self.case_dir, self.config)
- self.orchestrator = dict(
- requirements=get_config("orchestrator.requirements", config_file),
- )
+
self.details['orchestrator'] = dict(
- name=get_config("orchestrator.name", config_file),
- version=get_config("orchestrator.version", config_file),
+ name=functest_utils.get_parameter_from_yaml(
+ "orchestrator.name", config_file),
+ version=functest_utils.get_parameter_from_yaml(
+ "orchestrator.version", config_file),
status='ERROR',
result=''
)
- self.__logger.debug("Orchestrator configuration %s", self.orchestrator)
+
self.vnf = dict(
- descriptor=get_config("vnf.descriptor", config_file),
- inputs=get_config("vnf.inputs", config_file),
- requirements=get_config("vnf.requirements", config_file)
+ descriptor=functest_utils.get_parameter_from_yaml(
+ "vnf.descriptor", config_file),
+ inputs=functest_utils.get_parameter_from_yaml(
+ "vnf.inputs", config_file)
)
self.details['vnf'] = dict(
descriptor_version=self.vnf['descriptor']['version'],
- name=get_config("vnf.name", config_file),
- version=get_config("vnf.version", config_file),
+ name=functest_utils.get_parameter_from_yaml(
+ "vnf.name", config_file),
+ version=functest_utils.get_parameter_from_yaml(
+ "vnf.version", config_file),
)
self.__logger.debug("VNF configuration: %s", self.vnf)
self.details['test_vnf'] = dict(
- name=get_config("vnf_test_suite.name", config_file),
- version=get_config("vnf_test_suite.version", config_file)
+ name=functest_utils.get_parameter_from_yaml(
+ "vnf_test_suite.name", config_file),
+ version=functest_utils.get_parameter_from_yaml(
+ "vnf_test_suite.version", config_file)
)
- self.images = get_config("tenant_images", config_file)
- self.__logger.info("Images needed for vIMS: %s", self.images)
-
- def prepare(self):
- """Prepare testscase (Additional pre-configuration steps)."""
- super(CloudifyIms, self).prepare()
-
- self.__logger.info("Additional pre-configuration steps")
-
- compute_quotas = self.os_project.get_compute_quotas()
- network_quotas = self.os_project.get_network_quotas()
- for key, value in (
- self.vnf['requirements']['compute_quotas'].items()):
- setattr(compute_quotas, key, value)
+ self.image_alt = None
+ self.flavor_alt = None
+ self.clearwater = None
- for key, value in (
- self.vnf['requirements']['network_quotas'].items()):
- setattr(network_quotas, key, value)
+ def check_requirements(self):
+ if env.get('NEW_USER_ROLE').lower() == "admin":
+ self.__logger.warning(
+ "Defining NEW_USER_ROLE=admin will easily break the testcase "
+ "because Cloudify doesn't manage tenancy (e.g. subnet "
+ "overlapping)")
- compute_quotas = self.os_project.update_compute_quotas(compute_quotas)
- network_quotas = self.os_project.update_network_quotas(network_quotas)
-
- def deploy_orchestrator(self):
- # pylint: disable=too-many-locals,too-many-statements
+ def execute(self):
"""
Deploy Cloudify Manager.
network, security group, fip, VM creation
"""
+ assert super().execute() == 0
start_time = time.time()
-
- # orchestrator VM flavor
- self.__logger.info("Get or create flavor for cloudify manager vm ...")
- flavor_settings = FlavorConfig(
- name="{}-{}".format(
- self.orchestrator['requirements']['flavor']['name'],
- self.uuid),
- ram=self.orchestrator['requirements']['flavor']['ram_min'],
- disk=50,
- vcpus=2)
- flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings)
- flavor_creator.create()
- self.created_object.append(flavor_creator)
-
- self.__logger.info("Creating a second user to bypass issues ...")
- user_creator = OpenStackUser(
- self.snaps_creds,
- UserConfig(
- name='cloudify_network_bug-{}'.format(self.uuid),
- password=str(uuid.uuid4()),
- project_name=self.tenant_name,
- domain_name=self.snaps_creds.user_domain_name,
- roles={'_member_': self.tenant_name}))
- user_creator.create()
- self.created_object.append(user_creator)
-
- snaps_creds = user_creator.get_os_creds(self.snaps_creds.project_name)
- self.__logger.debug("snaps creds: %s", snaps_creds)
-
- self.__logger.info("Creating keypair ...")
- kp_file = os.path.join(self.data_dir, "cloudify_ims.pem")
- keypair_settings = KeypairConfig(
- name='cloudify_ims_kp-{}'.format(self.uuid),
- private_filepath=kp_file)
- keypair_creator = OpenStackKeypair(snaps_creds, keypair_settings)
- keypair_creator.create()
- self.created_object.append(keypair_creator)
-
- # needs some images
- self.__logger.info("Upload some OS images if it doesn't exist")
- for image_name, image_file in six.iteritems(self.images):
- self.__logger.info("image: %s, file: %s", image_name, image_file)
- if image_file and image_name:
- image_creator = OpenStackImage(
- snaps_creds,
- ImageConfig(
- name=image_name, image_user='cloud',
- img_format='qcow2', image_file=image_file))
- image_creator.create()
- self.created_object.append(image_creator)
-
- # network creation
- self.__logger.info("Creating full network ...")
- subnet_settings = SubnetConfig(
- name='cloudify_ims_subnet-{}'.format(self.uuid),
- cidr='10.67.79.0/24',
- dns_nameservers=[env.get('NAMESERVER')])
- network_settings = NetworkConfig(
- name='cloudify_ims_network-{}'.format(self.uuid),
- subnet_settings=[subnet_settings])
- network_creator = OpenStackNetwork(snaps_creds, network_settings)
- network_creator.create()
- self.created_object.append(network_creator)
- ext_net_name = snaps_utils.get_ext_net_name(snaps_creds)
- router_creator = OpenStackRouter(
- snaps_creds,
- RouterConfig(
- name='cloudify_ims_router-{}'.format(self.uuid),
- external_gateway=ext_net_name,
- internal_subnets=[subnet_settings.name]))
- router_creator.create()
- self.created_object.append(router_creator)
-
- # security group creation
- self.__logger.info("Creating security group for cloudify manager vm")
- sg_rules = list()
- sg_rules.append(
- SecurityGroupRuleConfig(
- sec_grp_name="sg-cloudify-manager-{}".format(self.uuid),
- direction=Direction.ingress, protocol=Protocol.tcp,
- port_range_min=1, port_range_max=65535))
- sg_rules.append(
- SecurityGroupRuleConfig(
- sec_grp_name="sg-cloudify-manager-{}".format(self.uuid),
- direction=Direction.ingress, protocol=Protocol.udp,
- port_range_min=1, port_range_max=65535))
- security_group_creator = OpenStackSecurityGroup(
- snaps_creds,
- SecurityGroupConfig(
- name="sg-cloudify-manager-{}".format(self.uuid),
- rule_settings=sg_rules))
- security_group_creator.create()
- self.created_object.append(security_group_creator)
-
- image_settings = ImageConfig(
- name=self.orchestrator['requirements']['os_image'],
- image_user='centos',
- exists=True)
- port_settings = PortConfig(
- name='cloudify_manager_port-{}'.format(self.uuid),
- network_name=network_settings.name)
- manager_settings = VmInstanceConfig(
- name='cloudify_manager-{}'.format(self.uuid),
- flavor=flavor_settings.name,
- port_settings=[port_settings],
- security_group_names=[
- security_group_creator.sec_grp_settings.name],
- floating_ip_settings=[FloatingIpConfig(
- name='cloudify_manager_fip-{}'.format(self.uuid),
- port_name=port_settings.name,
- router_name=router_creator.router_settings.name)])
- manager_creator = OpenStackVmInstance(
- snaps_creds, manager_settings, image_settings,
- keypair_settings)
- self.__logger.info("Creating cloudify manager VM")
- manager_creator.create()
- self.created_object.append(manager_creator)
-
- public_auth_url = keystone_utils.get_endpoint(snaps_creds, 'identity')
-
+ self.orig_cloud.set_network_quotas(
+ self.project.project.name,
+ security_group=self.quota_security_group,
+ security_group_rule=self.quota_security_group_rule,
+ port=self.quota_port)
+ self.__logger.info("Put OpenStack creds in manager")
cfy_creds = dict(
- keystone_username=snaps_creds.username,
- keystone_password=snaps_creds.password,
- keystone_tenant_name=snaps_creds.project_name,
- keystone_url=public_auth_url,
- region=snaps_creds.region_name if snaps_creds.region_name else (
- 'RegionOne'),
- user_domain_name=snaps_creds.user_domain_name,
- project_domain_name=snaps_creds.project_domain_name)
+ keystone_username=self.project.user.name,
+ keystone_password=self.project.password,
+ keystone_tenant_name=self.project.project.name,
+ keystone_url=self.get_public_auth_url(self.orig_cloud),
+ region=os.environ.get('OS_REGION_NAME', 'RegionOne'),
+ user_domain_name=os.environ.get(
+ 'OS_USER_DOMAIN_NAME', 'Default'),
+ project_domain_name=os.environ.get(
+ 'OS_PROJECT_DOMAIN_NAME', 'Default'))
self.__logger.info("Set creds for cloudify manager %s", cfy_creds)
- cfy_client = CloudifyClient(
- host=manager_creator.get_floating_ip().ip,
- username='admin', password='admin', tenant='default_tenant',
- api_version='v3')
-
- self.orchestrator['object'] = cfy_client
-
- self.__logger.info("Attemps running status of the Manager")
for loop in range(10):
try:
- self.__logger.debug(
- "status %s", cfy_client.manager.get_status())
- cfy_status = cfy_client.manager.get_status()['status']
- self.__logger.info(
- "The current manager status is %s", cfy_status)
- if str(cfy_status) != 'running':
- raise Exception("Cloudify Manager isn't up and running")
- self.__logger.info("Put OpenStack creds in manager")
- secrets_list = cfy_client.secrets.list()
+ secrets_list = self.cfy_client.secrets.list()
for k, val in six.iteritems(cfy_creds):
if not any(d.get('key', None) == k for d in secrets_list):
- cfy_client.secrets.create(k, val)
+ self.cfy_client.secrets.create(k, val)
else:
- cfy_client.secrets.update(k, val)
+ self.cfy_client.secrets.update(k, val)
break
except Exception: # pylint: disable=broad-except
- self.logger.info(
- "try %s: Cloudify Manager isn't up and running", loop + 1)
+ self.__logger.info(
+ "try %s: Cannot create secrets", loop + 1)
time.sleep(30)
else:
- self.logger.error("Cloudify Manager isn't up and running")
- return False
+ self.__logger.error("Cannot create secrets")
+ return 1
duration = time.time() - start_time
- if manager_creator.vm_ssh_active(block=True):
- self.__logger.info("Put private keypair in manager")
- ssh = manager_creator.ssh_client()
- scp = SCPClient(ssh.get_transport(), socket_timeout=15.0)
- scp.put(kp_file, '~/')
- cmd = "sudo cp ~/cloudify_ims.pem /etc/cloudify/"
- self.run_blocking_ssh_command(ssh, cmd)
- cmd = "sudo chmod 444 /etc/cloudify/cloudify_ims.pem"
- self.run_blocking_ssh_command(ssh, cmd)
- # cmd2 is badly unpinned by Cloudify
- cmd = "sudo yum install -y gcc python-devel python-cmd2"
- self.run_blocking_ssh_command(
- ssh, cmd, "Unable to install packages on manager")
- self.run_blocking_ssh_command(ssh, 'cfy status')
- else:
- self.__logger.error("Cannot connect to manager")
- return False
+ self.put_private_key()
+ self.upload_cfy_plugins(self.cop_yaml, self.cop_wgn)
self.details['orchestrator'].update(status='PASS', duration=duration)
self.vnf['inputs'].update(dict(
- external_network_name=ext_net_name,
- network_name=network_settings.name,
- key_pair_name=keypair_settings.name
+ external_network_name=self.ext_net.name,
+ network_name=self.network.name,
+ key_pair_name=self.keypair.name
))
+ if self.deploy_vnf() and self.test_vnf():
+ self.result = 100
+ return 0
self.result = 1/3 * 100
- return True
+ return 1
def deploy_vnf(self):
"""Deploy Clearwater IMS."""
start_time = time.time()
+ secgroups = self.cloud.list_security_groups(
+ filters={'name': 'default',
+ 'project_id': self.project.project.id})
+ if secgroups:
+ secgroup = secgroups[0]
+ else:
+ self.__logger.error("No 'default' security group in project %s",
+ self.project.project.name)
+ return False
+
+ self.cloud.create_security_group_rule(
+ secgroup.id, port_range_min=22, port_range_max=22,
+ protocol='tcp', direction='ingress')
+
self.__logger.info("Upload VNFD")
- cfy_client = self.orchestrator['object']
descriptor = self.vnf['descriptor']
- cfy_client.blueprints.upload(
+ self.cfy_client.blueprints.upload(
descriptor.get('file_name'), descriptor.get('name'))
- self.__logger.info("Get or create flavor for all clearwater vm")
- flavor_settings = FlavorConfig(
- name="{}-{}".format(
- self.vnf['requirements']['flavor']['name'],
- self.uuid),
- ram=self.vnf['requirements']['flavor']['ram_min'],
- disk=25,
- vcpus=2)
- flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings)
- flavor_creator.create()
- self.created_object.append(flavor_creator)
+ self.image_alt = self.publish_image_alt()
+ self.flavor_alt = self.create_flavor_alt()
self.vnf['inputs'].update(dict(
- flavor_id=flavor_settings.name,
+ image_id=self.image_alt.id,
+ flavor_id=self.flavor_alt.id,
))
self.__logger.info("Create VNF Instance")
- cfy_client.deployments.create(descriptor.get('name'),
- descriptor.get('name'),
- self.vnf.get('inputs'))
+ self.cfy_client.deployments.create(
+ descriptor.get('name'), descriptor.get('name'),
+ self.vnf.get('inputs'))
- wait_for_execution(
- cfy_client,
- get_execution_id(cfy_client, descriptor.get('name')),
+ cloudify.wait_for_execution(
+ self.cfy_client,
+ cloudify.get_execution_id(self.cfy_client, descriptor.get('name')),
self.__logger, timeout=300)
self.__logger.info("Start the VNF Instance deployment")
- execution = cfy_client.executions.start(descriptor.get('name'),
- 'install')
+ execution = self.cfy_client.executions.start(
+ descriptor.get('name'), 'install')
# Show execution log
- execution = wait_for_execution(
- cfy_client, execution, self.__logger, timeout=3600)
-
- duration = time.time() - start_time
+ execution = cloudify.wait_for_execution(
+ self.cfy_client, execution, self.__logger, timeout=3600)
self.__logger.info(execution)
- if execution.status == 'terminated':
- self.details['vnf'].update(status='PASS', duration=duration)
- self.result += 1/3 * 100
- result = True
- else:
- self.details['vnf'].update(status='FAIL', duration=duration)
- result = False
- return result
+ if execution.status != 'terminated':
+ self.details['vnf'].update(status='FAIL',
+ duration=time.time() - start_time)
+ return False
+
+ ellis_ip = self.cfy_client.deployments.outputs.get(
+ self.vnf['descriptor'].get('name'))['outputs']['ellis_ip']
+ bono_ip = self.cfy_client.deployments.outputs.get(
+ self.vnf['descriptor'].get('name'))['outputs']['bono_ip']
+ self.clearwater = clearwater.ClearwaterTesting(
+ self.case_name, bono_ip, ellis_ip)
+ self.clearwater.availability_check()
+
+ self.details['vnf'].update(status='PASS',
+ duration=time.time() - start_time)
+ self.result += 1/3 * 100
+ return True
def test_vnf(self):
"""Run test on clearwater ims instance."""
start_time = time.time()
-
- cfy_client = self.orchestrator['object']
-
- outputs = cfy_client.deployments.outputs.get(
- self.vnf['descriptor'].get('name'))['outputs']
- dns_ip = outputs['dns_ip']
- ellis_ip = outputs['ellis_ip']
- self.config_ellis(ellis_ip)
-
+ dns_ip = self.cfy_client.deployments.outputs.get(
+ self.vnf['descriptor'].get('name'))['outputs']['dns_ip']
if not dns_ip:
return False
-
- short_result = self.run_clearwater_live_test(
- dns_ip=dns_ip,
+ short_result, vnf_test_rate = self.clearwater.run_clearwater_live_test(
public_domain=self.vnf['inputs']["public_domain"])
duration = time.time() - start_time
self.__logger.info(short_result)
- self.details['test_vnf'].update(result=short_result,
- duration=duration)
- try:
- vnf_test_rate = short_result['passed'] / (
- short_result['total'] - short_result['skipped'])
- # orchestrator + vnf + test_vnf
- self.result += vnf_test_rate / 3 * 100
- except ZeroDivisionError:
- self.__logger.error("No test has been executed")
- self.details['test_vnf'].update(status='FAIL')
- return False
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Cannot calculate results")
+ self.details['test_vnf'].update(result=short_result, duration=duration)
+ self.result += vnf_test_rate / 3 * 100
+ if vnf_test_rate == 0:
self.details['test_vnf'].update(status='FAIL')
- return False
- return True if vnf_test_rate > 0 else False
+ return bool(vnf_test_rate > 0)
def clean(self):
"""Clean created objects/functions."""
- try:
- cfy_client = self.orchestrator['object']
- dep_name = self.vnf['descriptor'].get('name')
- # kill existing execution
- self.__logger.info('Deleting the current deployment')
- exec_list = cfy_client.executions.list(dep_name)
- for execution in exec_list:
- if execution['status'] == "started":
- try:
- cfy_client.executions.cancel(execution['id'],
- force=True)
- except Exception: # pylint: disable=broad-except
- self.__logger.warn("Can't cancel the current exec")
-
- execution = cfy_client.executions.start(
- dep_name,
- 'uninstall',
- parameters=dict(ignore_failure=True),
- force=True)
-
- wait_for_execution(cfy_client, execution, self.__logger)
- cfy_client.deployments.delete(self.vnf['descriptor'].get('name'))
- cfy_client.blueprints.delete(self.vnf['descriptor'].get('name'))
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Some issue during the undeployment ..")
-
- super(CloudifyIms, self).clean()
-
- @staticmethod
- def run_blocking_ssh_command(ssh, cmd,
- error_msg="Unable to run this command"):
- """Command to run ssh command with the exit status."""
- _, stdout, stderr = ssh.exec_command(cmd)
- CloudifyIms.__logger.debug("SSH %s stdout: %s", cmd, stdout.read())
- if stdout.channel.recv_exit_status() != 0:
- CloudifyIms.__logger.error("SSH %s stderr: %s", cmd, stderr.read())
- raise Exception(error_msg)
-
- @energy.enable_recording
- def run(self, **kwargs):
- """Execute CloudifyIms test case."""
- return super(CloudifyIms, self).run(**kwargs)
-
-
-# ----------------------------------------------------------
-#
-# YAML UTILS
-#
-# -----------------------------------------------------------
-def get_config(parameter, file_path):
- """
- Get config parameter.
-
- Returns the value of a given parameter in file.yaml
- parameter must be given in string format with dots
- Example: general.openstack.image_name
- """
- with open(file_path) as config_file:
- file_yaml = yaml.safe_load(config_file)
- config_file.close()
- value = file_yaml
- for element in parameter.split("."):
- value = value.get(element)
- if value is None:
- raise ValueError("The parameter %s is not defined in"
- " reporting.yaml" % parameter)
- return value
-
-
-def wait_for_execution(client, execution, logger, timeout=3600, ):
- """Wait for a workflow execution on Cloudify Manager."""
- # if execution already ended - return without waiting
- if execution.status in Execution.END_STATES:
- return execution
-
- if timeout is not None:
- deadline = time.time() + timeout
-
- # Poll for execution status and execution logs, until execution ends
- # and we receive an event of type in WORKFLOW_END_TYPES
- offset = 0
- batch_size = 50
- event_list = []
- execution_ended = False
- while True:
- event_list = client.events.list(
- execution_id=execution.id,
- _offset=offset,
- _size=batch_size,
- include_logs=True,
- sort='@timestamp').items
-
- offset = offset + len(event_list)
- for event in event_list:
- logger.debug(event.get('message'))
-
- if timeout is not None:
- if time.time() > deadline:
- raise RuntimeError(
- 'execution of operation {0} for deployment {1} '
- 'timed out'.format(execution.workflow_id,
- execution.deployment_id))
- else:
- # update the remaining timeout
- timeout = deadline - time.time()
-
- if not execution_ended:
- execution = client.executions.get(execution.id)
- execution_ended = execution.status in Execution.END_STATES
-
- if execution_ended:
- break
-
- time.sleep(5)
-
- return execution
-
-
-def get_execution_id(client, deployment_id):
- """
- Get the execution id of a env preparation.
-
- network, security group, fip, VM creation
- """
- executions = client.executions.list(deployment_id=deployment_id)
- for execution in executions:
- if execution.workflow_id == 'create_deployment_environment':
- return execution
- raise RuntimeError('Failed to get create_deployment_environment '
- 'workflow execution.'
- 'Available executions: {0}'.format(executions))
+ self.kill_existing_execution(self.vnf['descriptor'].get('name'))
+ if self.image_alt:
+ self.cloud.delete_image(self.image_alt)
+ if self.flavor_alt:
+ self.orig_cloud.delete_flavor(self.flavor_alt.id)
+ super().clean()
diff --git a/functest/opnfv_tests/vnf/ims/cloudify_ims.yaml b/functest/opnfv_tests/vnf/ims/cloudify_ims.yaml
index 6808cf33d..869281a20 100644
--- a/functest/opnfv_tests/vnf/ims/cloudify_ims.yaml
+++ b/functest/opnfv_tests/vnf/ims/cloudify_ims.yaml
@@ -1,35 +1,14 @@
---
-tenant_images:
- ubuntu_14.04:
- /home/opnfv/functest/images/ubuntu-14.04-server-cloudimg-amd64-disk1.img
- cloudify_manager_4.0:
- /home/opnfv/functest/images/cloudify-manager-premium-4.0.1.qcow2
orchestrator:
name: cloudify
version: '4.0'
- requirements:
- flavor:
- name: cloudify.medium
- ram_min: 4096
- os_image: 'cloudify_manager_4.0'
vnf:
name: clearwater
- version: '107'
+ version: '129'
descriptor:
- file_name: /src/vims/openstack-blueprint.yaml
+ file_name: /src/cloudify_vims/openstack-blueprint.yaml
name: clearwater-opnfv
- version: '122'
- requirements:
- flavor:
- name: cloudify.small
- ram_min: 2048
- compute_quotas:
- cores: 50
- instances: 15
- network_quotas:
- security_group: 20
- security_group_rule: 100
- port: 50
+ version: '129'
inputs:
image_id: 'ubuntu_14.04'
flavor_id: 'cloudify.small'
@@ -46,4 +25,4 @@ vnf:
homer_cluster_size: 1
vnf_test_suite:
name: clearwater-live-test
- version: "1.0"
+ version: '1.0'
diff --git a/functest/opnfv_tests/vnf/ims/heat_ims.py b/functest/opnfv_tests/vnf/ims/heat_ims.py
new file mode 100644
index 000000000..0d4e345a0
--- /dev/null
+++ b/functest/opnfv_tests/vnf/ims/heat_ims.py
@@ -0,0 +1,253 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2018 Kontron, Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""HeatIms testcase implementation."""
+
+from __future__ import division
+
+import logging
+import os
+import re
+import time
+import tempfile
+
+import paramiko
+import pkg_resources
+from xtesting.core import testcase
+
+from functest.core import singlevm
+from functest.opnfv_tests.vnf.ims import clearwater
+from functest.utils import config
+from functest.utils import env
+from functest.utils import functest_utils
+
+__author__ = "Valentin Boucher <valentin.boucher@kontron.com>"
+
+
+class HeatIms(singlevm.VmReady2):
+ # pylint: disable=too-many-instance-attributes
+ """Clearwater vIMS deployed with Heat Orchestrator Case."""
+
+ __logger = logging.getLogger(__name__)
+
+ filename = ('/home/opnfv/functest/images/'
+ 'ubuntu-14.04-server-cloudimg-amd64-disk1.img')
+
+ flavor_ram = 1024
+ flavor_vcpus = 1
+ flavor_disk = 3
+
+ quota_security_group = 20
+ quota_security_group_rule = 100
+ quota_port = 50
+
+ parameters = {
+ 'private_mgmt_net_cidr': '192.168.100.0/24',
+ 'private_mgmt_net_gateway': '192.168.100.254',
+ 'private_mgmt_net_pool_start': '192.168.100.1',
+ 'private_mgmt_net_pool_end': '192.168.100.253'}
+
+ def __init__(self, **kwargs):
+ """Initialize HeatIms testcase object."""
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = "heat_ims"
+ super().__init__(**kwargs)
+
+ # Retrieve the configuration
+ try:
+ self.config = getattr(
+ config.CONF, f'vnf_{self.case_name}_config')
+ except Exception as exc:
+ raise Exception("VNF config file not found") from exc
+
+ self.case_dir = pkg_resources.resource_filename(
+ 'functest', 'opnfv_tests/vnf/ims')
+ config_file = os.path.join(self.case_dir, self.config)
+
+ self.vnf = dict(
+ descriptor=functest_utils.get_parameter_from_yaml(
+ "vnf.descriptor", config_file),
+ parameters=functest_utils.get_parameter_from_yaml(
+ "vnf.inputs", config_file)
+ )
+ self.details['vnf'] = dict(
+ descriptor_version=self.vnf['descriptor']['version'],
+ name=functest_utils.get_parameter_from_yaml(
+ "vnf.name", config_file),
+ version=functest_utils.get_parameter_from_yaml(
+ "vnf.version", config_file),
+ )
+ self.__logger.debug("VNF configuration: %s", self.vnf)
+ self.keypair = None
+ self.stack = None
+ self.clearwater = None
+ self.role = None
+ (_, self.key_filename) = tempfile.mkstemp()
+
+ def create_network_resources(self):
+ pass
+
+ def execute(self):
+ # pylint: disable=too-many-locals,too-many-statements
+ """
+ Prepare Tenant/User
+
+ network, security group, fip, VM creation
+ """
+ self.orig_cloud.set_network_quotas(
+ self.project.project.name,
+ security_group=self.quota_security_group,
+ security_group_rule=self.quota_security_group_rule,
+ port=self.quota_port)
+ if not self.orig_cloud.get_role("heat_stack_owner"):
+ self.role = self.orig_cloud.create_role("heat_stack_owner")
+ self.orig_cloud.grant_role(
+ "heat_stack_owner", user=self.project.user.id,
+ project=self.project.project.id,
+ domain=self.project.domain.id)
+ self.keypair = self.cloud.create_keypair(
+ f'{self.case_name}-kp_{self.guid}')
+ self.__logger.info("keypair:\n%s", self.keypair.private_key)
+ with open(
+ self.key_filename, 'w', encoding='utf-8') as private_key_file:
+ private_key_file.write(self.keypair.private_key)
+
+ if self.deploy_vnf() and self.test_vnf():
+ self.result = 100
+ return 0
+ self.result = 1/3 * 100
+ return 1
+
+ def run(self, **kwargs):
+ """Deploy and test clearwater
+
+ Here are the main actions:
+ - deploy clearwater stack via heat
+ - test the vnf instance
+
+ Returns:
+ - TestCase.EX_OK
+ - TestCase.EX_RUN_ERROR on error
+ """
+ status = testcase.TestCase.EX_RUN_ERROR
+ try:
+ assert self.cloud
+ assert super().run(
+ **kwargs) == testcase.TestCase.EX_OK
+ self.result = 0
+ if not self.execute():
+ self.result = 100
+ status = testcase.TestCase.EX_OK
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception('Cannot run %s', self.case_name)
+ finally:
+ self.stop_time = time.time()
+ return status
+
+ def _monit(self, username="ubuntu", timeout=60):
+ servers = self.cloud.list_servers(detailed=True)
+ self.__logger.debug("servers: %s", servers)
+ for server in servers:
+ if 'ns' in server.name:
+ break
+ self.__logger.info("server:\n%s", server.name)
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
+ ssh.connect(
+ server.public_v4, username=username,
+ key_filename=self.key_filename, timeout=timeout)
+ (_, stdout, _) = ssh.exec_command('sudo monit summary')
+ self.__logger.info("output:\n%s", stdout.read().decode("utf-8"))
+ ssh.close()
+
+ def deploy_vnf(self):
+ """Deploy Clearwater IMS."""
+ start_time = time.time()
+ descriptor = self.vnf['descriptor']
+ parameters = self.vnf['parameters']
+
+ parameters['public_mgmt_net_id'] = self.ext_net.id
+ parameters['flavor'] = self.flavor.name
+ parameters['image'] = self.image.name
+ parameters['key_name'] = self.keypair.name
+ parameters['external_mgmt_dns_ip'] = env.get('NAMESERVER')
+ parameters.update(self.parameters)
+
+ self.__logger.info("Create Heat Stack")
+ self.stack = self.cloud.create_stack(
+ name=descriptor.get('name'),
+ template_file=descriptor.get('file_name'),
+ wait=True, **parameters)
+ self.__logger.debug("stack: %s", self.stack)
+
+ self._monit()
+
+ servers = self.cloud.list_servers(detailed=True)
+ self.__logger.debug("servers: %s", servers)
+ for server in servers:
+ if not self.check_regex_in_console(
+ server.name, regex='Cloud-init .* finished at ', loop=1):
+ return False
+ if 'ellis' in server.name:
+ self.__logger.debug("ellis: %s", server)
+ ellis_ip = server.public_v4
+ elif 'bono' in server.name:
+ self.__logger.debug("bono: %s", server)
+ bono_ip = server.public_v4
+
+ assert ellis_ip
+ assert bono_ip
+ self.clearwater = clearwater.ClearwaterTesting(
+ self.case_name, bono_ip, ellis_ip)
+ # This call can take time and many retry because Heat is
+ # an infrastructure orchestrator so when Heat say "stack created"
+ # it means that all OpenStack ressources are created but not that
+ # Clearwater are up and ready (Cloud-Init script still running)
+ self.clearwater.availability_check()
+
+ duration = time.time() - start_time
+
+ self.details['vnf'].update(status='PASS', duration=duration)
+ self.result += 1/3 * 100
+
+ return True
+
+ def test_vnf(self):
+ """Run test on clearwater ims instance."""
+ start_time = time.time()
+ outputs = self.cloud.get_stack(self.stack.id).outputs
+ self.__logger.debug("stack outputs: %s", outputs)
+ dns_ip = re.findall(r'[0-9]+(?:\.[0-9]+){3}', str(outputs))[0]
+ if not dns_ip:
+ return False
+ short_result, vnf_test_rate = self.clearwater.run_clearwater_live_test(
+ public_domain=self.vnf['parameters']["zone"])
+ duration = time.time() - start_time
+ self.__logger.info(short_result)
+ self.details['test_vnf'] = dict(result=short_result, duration=duration)
+ self.result += vnf_test_rate / 3 * 100
+ if vnf_test_rate == 0:
+ self.details['test_vnf'].update(status='FAIL')
+ self._monit()
+ return bool(vnf_test_rate > 0)
+
+ def clean(self):
+ """Clean created objects/functions."""
+ assert self.cloud
+ try:
+ if self.stack:
+ self.cloud.delete_stack(self.stack.id, wait=True)
+ except TypeError:
+ # shade raises TypeError exceptions when checking stack status
+ pass
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot clean stack ressources")
+ super().clean()
+ if self.role:
+ self.orig_cloud.delete_role(self.role.id)
diff --git a/functest/opnfv_tests/vnf/ims/heat_ims.yaml b/functest/opnfv_tests/vnf/ims/heat_ims.yaml
new file mode 100644
index 000000000..2ccdc0bf7
--- /dev/null
+++ b/functest/opnfv_tests/vnf/ims/heat_ims.yaml
@@ -0,0 +1,22 @@
+---
+orchestrator:
+ name: heat
+ version: '4.0'
+vnf:
+ name: clearwater
+ version: '130'
+ descriptor:
+ file_name: /src/heat_vims/clearwater.yaml
+ name: clearwater-opnfv
+ version: '130'
+ inputs:
+ zone: clearwater.opnfv
+ dn_range_start: "6505550000"
+ dn_range_length: "1000"
+ bono_cluster_size: 1
+ sprout_cluster_size: 1
+ vellum_cluster_size: 1
+ dime_cluster_size: 1
+ homer_cluster_size: 1
+ dnssec_key:
+ GkBraPnditvP2Em4oXV5wUTawmZaGGuO+Jt3ZnFkznGV3zFoQ+Ak13nuuOnO0JV5FqAr/KitdW6siqjXSjROXg==
diff --git a/functest/opnfv_tests/vnf/router/cloudify_vrouter.py b/functest/opnfv_tests/vnf/router/cloudify_vrouter.py
index e56f23cfc..32d675347 100644
--- a/functest/opnfv_tests/vnf/router/cloudify_vrouter.py
+++ b/functest/opnfv_tests/vnf/router/cloudify_vrouter.py
@@ -14,64 +14,54 @@
import logging
import os
import time
-import uuid
-
-from cloudify_rest_client import CloudifyClient
-from cloudify_rest_client.executions import Execution
-from scp import SCPClient
-import six
-from snaps.config.flavor import FlavorConfig
-from snaps.config.image import ImageConfig
-from snaps.config.keypair import KeypairConfig
-from snaps.config.network import NetworkConfig, PortConfig, SubnetConfig
-from snaps.config.router import RouterConfig
-from snaps.config.security_group import (
- Direction, Protocol, SecurityGroupConfig, SecurityGroupRuleConfig)
-from snaps.config.user import UserConfig
-from snaps.config.vm_inst import FloatingIpConfig, VmInstanceConfig
-from snaps.openstack.create_flavor import OpenStackFlavor
-from snaps.openstack.create_image import OpenStackImage
-from snaps.openstack.create_instance import OpenStackVmInstance
-from snaps.openstack.create_keypairs import OpenStackKeypair
-from snaps.openstack.create_network import OpenStackNetwork
-from snaps.openstack.create_security_group import OpenStackSecurityGroup
-from snaps.openstack.create_router import OpenStackRouter
-from snaps.openstack.create_user import OpenStackUser
-import snaps.openstack.utils.glance_utils as glance_utils
-from snaps.openstack.utils import keystone_utils
-
-from functest.opnfv_tests.openstack.snaps import snaps_utils
-import functest.opnfv_tests.vnf.router.vrouter_base as vrouter_base
+
+import pkg_resources
+
+from functest.core import cloudify
+from functest.opnfv_tests.vnf.router import vrouter_base
from functest.opnfv_tests.vnf.router.utilvnf import Utilvnf
from functest.utils import config
from functest.utils import env
from functest.utils import functest_utils
+
__author__ = "Shuya Nakama <shuya.nakama@okinawaopenlabs.org>"
-class CloudifyVrouter(vrouter_base.VrouterOnBoardingBase):
+class CloudifyVrouter(cloudify.Cloudify):
# pylint: disable=too-many-instance-attributes
"""vrouter testcase deployed with Cloudify Orchestrator."""
__logger = logging.getLogger(__name__)
- name = __name__
+
+ filename_alt = '/home/opnfv/functest/images/vyos-1.1.8-amd64.qcow2'
+
+ flavor_alt_ram = 1024
+ flavor_alt_vcpus = 1
+ flavor_alt_disk = 3
+
+ check_console_loop = 12
+
+ cop_yaml = ("https://github.com/cloudify-cosmo/cloudify-openstack-plugin/"
+ "releases/download/2.14.7/plugin.yaml")
+ cop_wgn = ("https://github.com/cloudify-cosmo/cloudify-openstack-plugin/"
+ "releases/download/2.14.7/cloudify_openstack_plugin-2.14.7-py27"
+ "-none-linux_x86_64-centos-Core.wgn")
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "vyos_vrouter"
- super(CloudifyVrouter, self).__init__(**kwargs)
+ super().__init__(**kwargs)
# Retrieve the configuration
try:
self.config = getattr(
- config.CONF, 'vnf_{}_config'.format(self.case_name))
- except Exception:
- raise Exception("VNF config file not found")
-
- self.cfy_manager_ip = ''
- self.deployment_name = ''
+ config.CONF, f'vnf_{self.case_name}_config')
+ except Exception as exc:
+ raise Exception("VNF config file not found") from exc
+ self.case_dir = pkg_resources.resource_filename(
+ 'functest', 'opnfv_tests/vnf/router')
config_file = os.path.join(self.case_dir, self.config)
self.orchestrator = dict(
requirements=functest_utils.get_parameter_from_yaml(
@@ -86,7 +76,7 @@ class CloudifyVrouter(vrouter_base.VrouterOnBoardingBase):
result=''
)
self.__logger.debug("Orchestrator configuration %s", self.orchestrator)
- self.__logger.debug("name = %s", self.name)
+ self.__logger.debug("name = %s", __name__)
self.vnf = dict(
descriptor=functest_utils.get_parameter_from_yaml(
"vnf.descriptor", config_file),
@@ -105,6 +95,10 @@ class CloudifyVrouter(vrouter_base.VrouterOnBoardingBase):
self.__logger.debug("VNF configuration: %s", self.vnf)
self.util = Utilvnf()
+ self.util.set_credentials(self.cloud)
+ credentials = {"cloud": self.cloud}
+ self.util_info = {"credentials": credentials,
+ "vnf_data_dir": self.util.vnf_data_dir}
self.details['test_vnf'] = dict(
name=functest_utils.get_parameter_from_yaml(
@@ -116,263 +110,94 @@ class CloudifyVrouter(vrouter_base.VrouterOnBoardingBase):
"tenant_images", config_file)
self.__logger.info("Images needed for vrouter: %s", self.images)
- @staticmethod
- def run_blocking_ssh_command(ssh, cmd,
- error_msg="Unable to run this command"):
- """Command to run ssh command with the exit status."""
- (_, stdout, stderr) = ssh.exec_command(cmd)
- CloudifyVrouter.__logger.debug("SSH %s stdout: %s", cmd, stdout.read())
- if stdout.channel.recv_exit_status() != 0:
- CloudifyVrouter.__logger.error(
- "SSH %s stderr: %s", cmd, stderr.read())
- raise Exception(error_msg)
-
- def prepare(self):
- super(CloudifyVrouter, self).prepare()
- self.__logger.info("Additional pre-configuration steps")
- self.util.set_credentials(self.snaps_creds)
-
- def deploy_orchestrator(self):
+ self.image_alt = None
+ self.flavor_alt = None
+
+ def check_requirements(self):
+ if env.get('NEW_USER_ROLE').lower() == "admin":
+ self.__logger.warning(
+ "Defining NEW_USER_ROLE=admin will easily break the testcase "
+ "because Cloudify doesn't manage tenancy (e.g. subnet "
+ "overlapping)")
+
+ def execute(self):
# pylint: disable=too-many-locals,too-many-statements
"""
Deploy Cloudify Manager.
network, security group, fip, VM creation
"""
# network creation
+ super().execute()
start_time = time.time()
+ self.put_private_key()
+ self.upload_cfy_plugins(self.cop_yaml, self.cop_wgn)
- # orchestrator VM flavor
- self.__logger.info("Get or create flavor for cloudify manager vm ...")
- flavor_settings = FlavorConfig(
- name="{}-{}".format(
- self.orchestrator['requirements']['flavor']['name'],
- self.uuid),
- ram=self.orchestrator['requirements']['flavor']['ram_min'],
- disk=50, vcpus=2)
- flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings)
- flavor_creator.create()
- self.created_object.append(flavor_creator)
-
- user_creator = OpenStackUser(
- self.snaps_creds,
- UserConfig(
- name='cloudify_network_bug-{}'.format(self.uuid),
- password=str(uuid.uuid4()),
- project_name=self.tenant_name,
- domain_name=self.snaps_creds.user_domain_name,
- roles={'_member_': self.tenant_name}))
- user_creator.create()
- self.created_object.append(user_creator)
-
- snaps_creds = user_creator.get_os_creds(self.snaps_creds.project_name)
- self.__logger.debug("snaps creds: %s", snaps_creds)
-
- self.__logger.info("Creating keypair ...")
- kp_file = os.path.join(self.data_dir, "cloudify_vrouter.pem")
- keypair_settings = KeypairConfig(
- name='cloudify_vrouter_kp-{}'.format(self.uuid),
- private_filepath=kp_file)
- keypair_creator = OpenStackKeypair(snaps_creds, keypair_settings)
- keypair_creator.create()
- self.created_object.append(keypair_creator)
-
- self.__logger.info("Upload some OS images if it doesn't exist")
- for image_name, image_file in six.iteritems(self.images):
- self.__logger.info("image: %s, file: %s", image_name, image_file)
- if image_file and image_name:
- image_creator = OpenStackImage(
- snaps_creds,
- ImageConfig(
- name=image_name, image_user='cloud',
- img_format='qcow2', image_file=image_file))
- image_creator.create()
- self.created_object.append(image_creator)
-
- self.__logger.info("Creating full network ...")
- subnet_settings = SubnetConfig(
- name='cloudify_vrouter_subnet-{}'.format(self.uuid),
- cidr='10.67.79.0/24',
- dns_nameservers=[env.get('NAMESERVER')])
- network_settings = NetworkConfig(
- name='cloudify_vrouter_network-{}'.format(self.uuid),
- subnet_settings=[subnet_settings])
- network_creator = OpenStackNetwork(snaps_creds, network_settings)
- network_creator.create()
- self.created_object.append(network_creator)
- ext_net_name = snaps_utils.get_ext_net_name(snaps_creds)
- router_creator = OpenStackRouter(
- snaps_creds,
- RouterConfig(
- name='cloudify_vrouter_router-{}'.format(self.uuid),
- external_gateway=ext_net_name,
- internal_subnets=[subnet_settings.name]))
- router_creator.create()
- self.created_object.append(router_creator)
-
- # security group creation
- self.__logger.info("Creating security group for cloudify manager vm")
- sg_rules = list()
- sg_rules.append(
- SecurityGroupRuleConfig(
- sec_grp_name="sg-cloudify-manager-{}".format(self.uuid),
- direction=Direction.ingress,
- protocol=Protocol.tcp, port_range_min=1,
- port_range_max=65535))
- sg_rules.append(
- SecurityGroupRuleConfig(
- sec_grp_name="sg-cloudify-manager-{}".format(self.uuid),
- direction=Direction.ingress,
- protocol=Protocol.udp, port_range_min=1,
- port_range_max=65535))
- security_group_creator = OpenStackSecurityGroup(
- snaps_creds,
- SecurityGroupConfig(
- name="sg-cloudify-manager-{}".format(self.uuid),
- rule_settings=sg_rules))
- security_group_creator.create()
- self.created_object.append(security_group_creator)
-
- image_settings = ImageConfig(
- name=self.orchestrator['requirements']['os_image'],
- image_user='centos', exists=True)
- port_settings = PortConfig(
- name='cloudify_manager_port-{}'.format(self.uuid),
- network_name=network_settings.name)
- manager_settings = VmInstanceConfig(
- name='cloudify_manager-{}'.format(self.uuid),
- flavor=flavor_settings.name,
- port_settings=[port_settings],
- security_group_names=[
- security_group_creator.sec_grp_settings.name],
- floating_ip_settings=[FloatingIpConfig(
- name='cloudify_manager_fip-{}'.format(self.uuid),
- port_name=port_settings.name,
- router_name=router_creator.router_settings.name)])
- manager_creator = OpenStackVmInstance(
- snaps_creds, manager_settings, image_settings,
- keypair_settings)
-
- self.__logger.info("Creating cloudify manager VM")
- manager_creator.create()
- self.created_object.append(manager_creator)
-
- cfy_client = CloudifyClient(
- host=manager_creator.get_floating_ip().ip,
- username='admin', password='admin', tenant='default_tenant',
- api_version='v3')
-
- self.orchestrator['object'] = cfy_client
-
- self.cfy_manager_ip = manager_creator.get_floating_ip().ip
-
- self.__logger.info("Attemps running status of the Manager")
- for loop in range(10):
- try:
- self.__logger.debug(
- "status %s", cfy_client.manager.get_status())
- cfy_status = cfy_client.manager.get_status()['status']
- self.__logger.info(
- "The current manager status is %s", cfy_status)
- if str(cfy_status) != 'running':
- raise Exception("Cloudify Manager isn't up and running")
- break
- except Exception: # pylint: disable=broad-except
- self.logger.info(
- "try %s: Cloudify Manager isn't up and running", loop + 1)
- time.sleep(30)
- else:
- self.logger.error("Cloudify Manager isn't up and running")
- return False
+ self.image_alt = self.publish_image_alt()
+ self.flavor_alt = self.create_flavor_alt()
duration = time.time() - start_time
-
- self.__logger.info("Put private keypair in manager")
- if manager_creator.vm_ssh_active(block=True):
- ssh = manager_creator.ssh_client()
- scp = SCPClient(ssh.get_transport(), socket_timeout=15.0)
- scp.put(kp_file, '~/')
- cmd = "sudo cp ~/cloudify_vrouter.pem /etc/cloudify/"
- self.run_blocking_ssh_command(ssh, cmd)
- cmd = "sudo chmod 444 /etc/cloudify/cloudify_vrouter.pem"
- self.run_blocking_ssh_command(ssh, cmd)
- # cmd2 is badly unpinned by Cloudify
- cmd = "sudo yum install -y gcc python-devel python-cmd2"
- self.run_blocking_ssh_command(
- ssh, cmd, "Unable to install packages on manager")
- else:
- self.__logger.error("Cannot connect to manager")
- return False
-
self.details['orchestrator'].update(status='PASS', duration=duration)
- self.__logger.info("Get or create flavor for vrouter")
- flavor_settings = FlavorConfig(
- name="{}-{}".format(
- self.vnf['requirements']['flavor']['name'],
- self.uuid),
- ram=self.vnf['requirements']['flavor']['ram_min'],
- disk=25, vcpus=1)
- flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings)
- flavor = flavor_creator.create()
- self.created_object.append(flavor_creator)
-
- # set image name
- glance = glance_utils.glance_client(snaps_creds)
- image = glance_utils.get_image(glance, "vyos1.1.7")
- self.vnf['inputs'].update(dict(external_network_name=ext_net_name))
- self.vnf['inputs'].update(dict(target_vnf_image_id=image.id))
- self.vnf['inputs'].update(dict(reference_vnf_image_id=image.id))
- self.vnf['inputs'].update(dict(target_vnf_flavor_id=flavor.id))
- self.vnf['inputs'].update(dict(reference_vnf_flavor_id=flavor.id))
self.vnf['inputs'].update(dict(
- keystone_username=snaps_creds.username))
+ external_network_name=self.ext_net.name))
self.vnf['inputs'].update(dict(
- keystone_password=snaps_creds.password))
+ target_vnf_image_id=self.image_alt.id))
self.vnf['inputs'].update(dict(
- keystone_tenant_name=snaps_creds.project_name))
+ reference_vnf_image_id=self.image_alt.id))
self.vnf['inputs'].update(dict(
- keystone_user_domain_name=snaps_creds.user_domain_name))
+ target_vnf_flavor_id=self.flavor_alt.id))
self.vnf['inputs'].update(dict(
- keystone_project_domain_name=snaps_creds.project_domain_name))
+ reference_vnf_flavor_id=self.flavor_alt.id))
self.vnf['inputs'].update(dict(
- region=snaps_creds.region_name if snaps_creds.region_name else (
- 'RegionOne')))
+ keystone_username=self.project.user.name))
self.vnf['inputs'].update(dict(
- keystone_url=keystone_utils.get_endpoint(
- snaps_creds, 'identity')))
-
- credentials = {"snaps_creds": snaps_creds}
- self.util_info = {"credentials": credentials,
- "cfy": cfy_client,
- "vnf_data_dir": self.util.vnf_data_dir}
+ keystone_password=self.project.password))
+ self.vnf['inputs'].update(dict(
+ keystone_tenant_name=self.project.project.name))
+ self.vnf['inputs'].update(dict(
+ keystone_user_domain_name=os.environ.get(
+ 'OS_USER_DOMAIN_NAME', 'Default')))
+ self.vnf['inputs'].update(dict(
+ keystone_project_domain_name=os.environ.get(
+ 'OS_PROJECT_DOMAIN_NAME', 'Default')))
+ self.vnf['inputs'].update(dict(
+ region=os.environ.get('OS_REGION_NAME', 'RegionOne')))
+ self.vnf['inputs'].update(dict(
+ keystone_url=self.get_public_auth_url(self.orig_cloud)))
- return True
+ if self.deploy_vnf() and self.test_vnf():
+ self.result = 100
+ return 0
+ self.result = 1/3 * 100
+ return 1
def deploy_vnf(self):
start_time = time.time()
-
self.__logger.info("Upload VNFD")
- cfy_client = self.orchestrator['object']
descriptor = self.vnf['descriptor']
- self.deployment_name = descriptor.get('name')
+ self.util_info["cfy"] = self.cfy_client
+ self.util_info["cfy_manager_ip"] = self.fip.floating_ip_address
+ self.util_info["deployment_name"] = descriptor.get('name')
- cfy_client.blueprints.upload(
+ self.cfy_client.blueprints.upload(
descriptor.get('file_name'), descriptor.get('name'))
self.__logger.info("Create VNF Instance")
- cfy_client.deployments.create(
+ self.cfy_client.deployments.create(
descriptor.get('name'), descriptor.get('name'),
self.vnf.get('inputs'))
- wait_for_execution(
- cfy_client, get_execution_id(cfy_client, descriptor.get('name')),
+ cloudify.wait_for_execution(
+ self.cfy_client, cloudify.get_execution_id(
+ self.cfy_client, descriptor.get('name')),
self.__logger, timeout=7200)
self.__logger.info("Start the VNF Instance deployment")
- execution = cfy_client.executions.start(descriptor.get('name'),
- 'install')
+ execution = self.cfy_client.executions.start(
+ descriptor.get('name'), 'install')
# Show execution log
- execution = wait_for_execution(cfy_client, execution, self.__logger)
+ execution = cloudify.wait_for_execution(
+ self.cfy_client, execution, self.__logger)
duration = time.time() - start_time
@@ -387,7 +212,8 @@ class CloudifyVrouter(vrouter_base.VrouterOnBoardingBase):
def test_vnf(self):
start_time = time.time()
- result, test_result_data = super(CloudifyVrouter, self).test_vnf()
+ testing = vrouter_base.VrouterOnBoardingBase(self.util, self.util_info)
+ result, test_result_data = testing.test_vnf()
duration = time.time() - start_time
if result:
self.details['test_vnf'].update(
@@ -400,91 +226,9 @@ class CloudifyVrouter(vrouter_base.VrouterOnBoardingBase):
return True
def clean(self):
- try:
- cfy_client = self.orchestrator['object']
- dep_name = self.vnf['descriptor'].get('name')
- # kill existing execution
- self.__logger.info('Deleting the current deployment')
- exec_list = cfy_client.executions.list(dep_name)
- for execution in exec_list:
- if execution['status'] == "started":
- try:
- cfy_client.executions.cancel(
- execution['id'], force=True)
- except Exception: # pylint: disable=broad-except
- self.__logger.warn("Can't cancel the current exec")
-
- execution = cfy_client.executions.start(
- dep_name, 'uninstall', parameters=dict(ignore_failure=True))
-
- wait_for_execution(cfy_client, execution, self.__logger)
- cfy_client.deployments.delete(self.vnf['descriptor'].get('name'))
- cfy_client.blueprints.delete(self.vnf['descriptor'].get('name'))
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Some issue during the undeployment ..")
-
- super(CloudifyVrouter, self).clean()
-
- def get_vnf_info_list(self, target_vnf_name):
- return self.util.get_vnf_info_list(
- self.cfy_manager_ip, self.deployment_name, target_vnf_name)
-
-
-def wait_for_execution(client, execution, logger, timeout=7200, ):
- """Wait for a workflow execution on Cloudify Manager."""
- # if execution already ended - return without waiting
- if execution.status in Execution.END_STATES:
- return execution
-
- if timeout is not None:
- deadline = time.time() + timeout
-
- # Poll for execution status and execution logs, until execution ends
- # and we receive an event of type in WORKFLOW_END_TYPES
- offset = 0
- batch_size = 50
- event_list = []
- execution_ended = False
- while True:
- event_list = client.events.list(
- execution_id=execution.id, _offset=offset, _size=batch_size,
- include_logs=True, sort='@timestamp').items
-
- offset = offset + len(event_list)
- for event in event_list:
- logger.debug(event.get('message'))
-
- if timeout is not None:
- if time.time() > deadline:
- raise RuntimeError(
- 'execution of operation {0} for deployment {1} '
- 'timed out'.format(execution.workflow_id,
- execution.deployment_id))
- else:
- # update the remaining timeout
- timeout = deadline - time.time()
-
- if not execution_ended:
- execution = client.executions.get(execution.id)
- execution_ended = execution.status in Execution.END_STATES
-
- if execution_ended:
- break
-
- time.sleep(5)
-
- return execution
-
-
-def get_execution_id(client, deployment_id):
- """
- Get the execution id of a env preparation.
- network, security group, fip, VM creation
- """
- executions = client.executions.list(deployment_id=deployment_id)
- for execution in executions:
- if execution.workflow_id == 'create_deployment_environment':
- return execution
- raise RuntimeError('Failed to get create_deployment_environment '
- 'workflow execution.'
- 'Available executions: {0}'.format(executions))
+ self.kill_existing_execution(self.vnf['descriptor'].get('name'))
+ if self.image_alt:
+ self.cloud.delete_image(self.image_alt)
+ if self.flavor_alt:
+ self.orig_cloud.delete_flavor(self.flavor_alt.id)
+ super().clean()
diff --git a/functest/opnfv_tests/vnf/router/cloudify_vrouter.yaml b/functest/opnfv_tests/vnf/router/cloudify_vrouter.yaml
index 649cd6ccd..2d98dffa5 100644
--- a/functest/opnfv_tests/vnf/router/cloudify_vrouter.yaml
+++ b/functest/opnfv_tests/vnf/router/cloudify_vrouter.yaml
@@ -3,9 +3,6 @@ tenant_images:
cloudify_manager_4.0:
/home/opnfv/functest/images/cloudify-manager-premium-4.0.1.qcow2
vyos1.1.7: /home/opnfv/functest/images/vyos-1.1.7.img
-test_data:
- url: 'https://github.com/oolorg/opnfv-vnf-data.git'
- branch: 'fraser'
orchestrator:
name: cloudify
version: '4.0'
diff --git a/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py b/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py
index be7bee889..9eb3c5d69 100644
--- a/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py
+++ b/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py
@@ -12,6 +12,7 @@
"""vrouter function test execution module"""
import logging
+import os
import time
import yaml
@@ -20,7 +21,7 @@ from functest.opnfv_tests.vnf.router.vnf_controller.vnf_controller import (
VnfController)
-class FunctionTestExec(object):
+class FunctionTestExec():
"""vrouter function test execution class"""
logger = logging.getLogger(__name__)
@@ -31,17 +32,16 @@ class FunctionTestExec(object):
credentials = util_info["credentials"]
self.vnf_ctrl = VnfController(util_info)
- test_cmd_map_file = open(self.util.vnf_data_dir +
- self.util.opnfv_vnf_data_dir +
- self.util.command_template_dir +
- self.util.test_cmd_map_yaml_file,
- 'r')
- self.test_cmd_map_yaml = yaml.safe_load(test_cmd_map_file)
- test_cmd_map_file.close()
+ with open(
+ os.path.join(
+ self.util.vnf_data_dir, self.util.command_template_dir,
+ self.util.test_cmd_map_yaml_file),
+ 'r', encoding='utf-8') as test_cmd_map_file:
+ self.test_cmd_map_yaml = yaml.safe_load(test_cmd_map_file)
- self.util.set_credentials(credentials["snaps_creds"])
+ self.util.set_credentials(credentials["cloud"])
- with open(self.util.test_env_config_yaml) as file_fd:
+ with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd:
test_env_config_yaml = yaml.safe_load(file_fd)
file_fd.close()
diff --git a/functest/opnfv_tests/vnf/router/utilvnf.py b/functest/opnfv_tests/vnf/router/utilvnf.py
index 31e1b9196..111f20c1a 100644
--- a/functest/opnfv_tests/vnf/router/utilvnf.py
+++ b/functest/opnfv_tests/vnf/router/utilvnf.py
@@ -14,13 +14,9 @@
import json
import logging
import os
-import pkg_resources
import requests
import yaml
-from git import Repo
-from snaps.openstack.utils import nova_utils
-
from functest.utils import config
RESULT_SPRIT_INDEX = {
@@ -47,22 +43,19 @@ NUMBER_OF_DIGITS_FOR_AVG_JITTER = 3
NUMBER_OF_DIGITS_FOR_AVG_PKT_LOSS = 1
-class Utilvnf(object): # pylint: disable=too-many-instance-attributes
+class Utilvnf(): # pylint: disable=too-many-instance-attributes
""" Utility class of vrouter testcase """
logger = logging.getLogger(__name__)
def __init__(self):
- self.snaps_creds = ""
self.vnf_data_dir = getattr(config.CONF, 'dir_router_data')
- self.opnfv_vnf_data_dir = "opnfv-vnf-data/"
self.command_template_dir = "command_template/"
self.test_scenario_yaml = "test_scenario.yaml"
test_env_config_yaml_file = "test_env_config.yaml"
self.test_cmd_map_yaml_file = "test_cmd_map.yaml"
self.test_env_config_yaml = os.path.join(
self.vnf_data_dir,
- self.opnfv_vnf_data_dir,
test_env_config_yaml_file)
self.blueprint_dir = "opnfv-vnf-vyos-blueprint/"
@@ -71,29 +64,7 @@ class Utilvnf(object): # pylint: disable=too-many-instance-attributes
if not os.path.exists(self.vnf_data_dir):
os.makedirs(self.vnf_data_dir)
- case_dir = pkg_resources.resource_filename(
- 'functest', 'opnfv_tests/vnf/router')
-
- config_file_name = getattr(
- config.CONF, 'vnf_{}_config'.format("vyos_vrouter"))
-
- config_file = os.path.join(case_dir, config_file_name)
-
- with open(config_file) as file_fd:
- vrouter_config_yaml = yaml.safe_load(file_fd)
- file_fd.close()
-
- test_data = vrouter_config_yaml.get("test_data")
-
- self.logger.debug("Downloading the test data.")
- vrouter_data_path = self.vnf_data_dir + self.opnfv_vnf_data_dir
-
- if not os.path.exists(vrouter_data_path):
- Repo.clone_from(test_data['url'],
- vrouter_data_path,
- branch=test_data['branch'])
-
- with open(self.test_env_config_yaml) as file_fd:
+ with open(self.test_env_config_yaml, encoding='utf-8') as file_fd:
test_env_config_yaml = yaml.safe_load(file_fd)
file_fd.close()
@@ -107,71 +78,27 @@ class Utilvnf(object): # pylint: disable=too-many-instance-attributes
os.remove(self.test_result_json_file)
self.logger.debug("removed %s", self.test_result_json_file)
- def get_nova_client(self):
- nova_client = nova_utils.nova_client(self.snaps_creds)
-
- return nova_client
+ self.cloud = None
- def set_credentials(self, snaps_creds):
- self.snaps_creds = snaps_creds
+ def set_credentials(self, cloud):
+ self.cloud = cloud
def get_address(self, server_name, network_name):
- nova_client = self.get_nova_client()
- servers_list = nova_client.servers.list()
- server = None
-
- for server in servers_list:
- if server.name == server_name:
- break
-
+ server = self.cloud.get_server(server_name)
address = server.addresses[
network_name][NOVA_CILENT_NETWORK_INFO_INDEX]["addr"]
return address
def get_mac_address(self, server_name, network_name):
- nova_client = self.get_nova_client()
- servers_list = nova_client.servers.list()
- server = None
-
- for server in servers_list:
- if server.name == server_name:
- break
-
+ server = self.cloud.get_server(server_name)
mac_address = server.addresses[network_name][
NOVA_CILENT_NETWORK_INFO_INDEX]["OS-EXT-IPS-MAC:mac_addr"]
return mac_address
- def reboot_vm(self, server_name):
- nova_client = self.get_nova_client()
- servers_list = nova_client.servers.list()
- server = None
-
- for server in servers_list:
- if server.name == server_name:
- break
-
- server.reboot()
-
- return
-
- def delete_vm(self, server_name):
- nova_client = self.get_nova_client()
- servers_list = nova_client.servers.list()
- server = None
-
- for server in servers_list:
- if server.name == server_name:
- nova_client.servers.delete(server)
- break
-
- return
-
def get_blueprint_outputs(self, cfy_manager_ip, deployment_name):
- url = "http://%s/deployments/%s/outputs" % (
- cfy_manager_ip, deployment_name)
-
+ url = f"http://{cfy_manager_ip}/deployments/{deployment_name}/outputs"
response = requests.get(
url,
auth=requests.auth.HTTPBasicAuth('admin', 'admin'),
@@ -200,15 +127,10 @@ class Utilvnf(object): # pylint: disable=too-many-instance-attributes
network_list.append(networks[network_name])
return network_list
- def request_vnf_reboot(self, vnf_info_list):
- for vnf in vnf_info_list:
- self.logger.debug("reboot the %s", vnf["vnf_name"])
- self.reboot_vm(vnf["vnf_name"])
-
def request_vm_delete(self, vnf_info_list):
for vnf in vnf_info_list:
self.logger.debug("delete the %s", vnf["vnf_name"])
- self.delete_vm(vnf["vnf_name"])
+ self.cloud.delete_server(vnf["vnf_name"])
def get_vnf_info_list(self, cfy_manager_ip, topology_deploy_name,
target_vnf_name):
@@ -288,24 +210,29 @@ class Utilvnf(object): # pylint: disable=too-many-instance-attributes
def write_result_data(self, result_data):
test_result = []
if not os.path.isfile(self.test_result_json_file):
- file_fd = open(self.test_result_json_file, "w")
- file_fd.close()
+ with open(
+ self.test_result_json_file, "w",
+ encoding="utf-8") as file_fd:
+ pass
else:
- file_fd = open(self.test_result_json_file, "r")
- test_result = json.load(file_fd)
- file_fd.close()
+ with open(
+ self.test_result_json_file, "r",
+ encoding="utf-8") as file_fd:
+ test_result = json.load(file_fd)
test_result.append(result_data)
- file_fd = open(self.test_result_json_file, "w")
- json.dump(test_result, file_fd)
- file_fd.close()
+ with open(
+ self.test_result_json_file, "w",
+ encoding="utf-8") as file_fd:
+ json.dump(test_result, file_fd)
def output_test_result_json(self):
if os.path.isfile(self.test_result_json_file):
- file_fd = open(self.test_result_json_file, "r")
- test_result = json.load(file_fd)
- file_fd.close()
+ with open(
+ self.test_result_json_file, "r",
+ encoding="utf-8") as file_fd:
+ test_result = json.load(file_fd)
output_json_data = json.dumps(test_result,
sort_keys=True,
indent=4)
@@ -315,8 +242,6 @@ class Utilvnf(object): # pylint: disable=too-many-instance-attributes
@staticmethod
def get_test_scenario(file_path):
- test_scenario_file = open(file_path,
- 'r')
- test_scenario_yaml = yaml.safe_load(test_scenario_file)
- test_scenario_file.close()
+ with open(file_path, "r", encoding="utf-8") as test_scenario_file:
+ test_scenario_yaml = yaml.safe_load(test_scenario_file)
return test_scenario_yaml["test_scenario_list"]
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/checker.py b/functest/opnfv_tests/vnf/router/vnf_controller/checker.py
index a7a70f6d7..d3a216ed0 100644
--- a/functest/opnfv_tests/vnf/router/vnf_controller/checker.py
+++ b/functest/opnfv_tests/vnf/router/vnf_controller/checker.py
@@ -18,7 +18,7 @@ import re
from jinja2 import Environment, FileSystemLoader
-class Checker(object):
+class Checker():
"""vrouter test result check class"""
logger = logging.getLogger(__name__)
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/command_generator.py b/functest/opnfv_tests/vnf/router/vnf_controller/command_generator.py
index 7d9116bcc..a86a16485 100644
--- a/functest/opnfv_tests/vnf/router/vnf_controller/command_generator.py
+++ b/functest/opnfv_tests/vnf/router/vnf_controller/command_generator.py
@@ -15,7 +15,7 @@ import logging
from jinja2 import Environment, FileSystemLoader
-class CommandGenerator(object):
+class CommandGenerator():
"""command generator class for vrouter testing"""
logger = logging.getLogger(__name__)
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py b/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py
index c5f554cbd..269f6526b 100644
--- a/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py
+++ b/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py
@@ -24,7 +24,7 @@ DEFAULT_CONNECT_RETRY_COUNT = 10
DEFAULT_SEND_TIMEOUT = 10
-class SshClient(object): # pylint: disable=too-many-instance-attributes
+class SshClient(): # pylint: disable=too-many-instance-attributes
"""ssh client class for vrouter testing"""
logger = logging.getLogger(__name__)
@@ -43,7 +43,7 @@ class SshClient(object): # pylint: disable=too-many-instance-attributes
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.util = Utilvnf()
- with open(self.util.test_env_config_yaml) as file_fd:
+ with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd:
test_env_config_yaml = yaml.safe_load(file_fd)
file_fd.close()
@@ -80,7 +80,7 @@ class SshClient(object): # pylint: disable=too-many-instance-attributes
retrycount -= 1
if retrycount == 0:
- self.logger.warn(
+ self.logger.warning(
"Cannot establish connection to IP '%s'", self.ip_address)
self.connected = False
return self.connected
@@ -110,7 +110,7 @@ class SshClient(object): # pylint: disable=too-many-instance-attributes
cmd)
break
- res_buff += res
+ res_buff += res.decode("utf-8")
self.logger.debug("Response : '%s'", res_buff)
return res_buff
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py b/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py
index 79acc776f..2210b3909 100644
--- a/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py
+++ b/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py
@@ -23,7 +23,7 @@ from functest.opnfv_tests.vnf.router.vnf_controller.ssh_client import (
SshClient)
-class VmController(object):
+class VmController():
"""vm controll class"""
logger = logging.getLogger(__name__)
@@ -34,14 +34,12 @@ class VmController(object):
credentials = util_info["credentials"]
self.util = Utilvnf()
- self.util.set_credentials(credentials["snaps_creds"])
+ self.util.set_credentials(credentials["cloud"])
- with open(self.util.test_env_config_yaml) as file_fd:
+ with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd:
test_env_config_yaml = yaml.safe_load(file_fd)
file_fd.close()
- self.reboot_wait = test_env_config_yaml.get("general").get(
- "reboot_wait")
self.command_wait = test_env_config_yaml.get("general").get(
"command_wait")
self.ssh_connect_timeout = test_env_config_yaml.get("general").get(
@@ -85,16 +83,10 @@ class VmController(object):
result = ssh.connect(self.ssh_connect_timeout,
self.ssh_connect_retry_count)
if not result:
- self.logger.warn("Reboot %s", vm_info["vnf_name"])
- self.util.reboot_vm(vm_info["vnf_name"])
- time.sleep(self.reboot_wait)
- result = ssh.connect(self.ssh_connect_timeout,
- self.ssh_connect_retry_count)
- if not result:
- self.logger.error(
- "Cannot establish connection to IP '%s'. Aborting!",
- ssh.ip_address)
- return None
+ self.logger.error(
+ "Cannot establish connection to IP '%s'. Aborting!",
+ ssh.ip_address)
+ return None
(result, _) = self.command_create_and_execute(
ssh,
@@ -109,10 +101,8 @@ class VmController(object):
def command_create_and_execute(self, ssh, test_cmd_file_path,
cmd_input_param, prompt_file_path):
- prompt_file = open(prompt_file_path,
- 'r')
- prompt = yaml.safe_load(prompt_file)
- prompt_file.close()
+ with open(prompt_file_path, 'r', encoding='utf-8') as prompt_file:
+ prompt = yaml.safe_load(prompt_file)
config_mode_prompt = prompt["config_mode"]
commands = self.command_gen_from_template(test_cmd_file_path,
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py b/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py
index a5b1ad856..46584456f 100644
--- a/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py
+++ b/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py
@@ -26,7 +26,7 @@ from functest.opnfv_tests.vnf.router.vnf_controller.vm_controller import (
VmController)
-class VnfController(object):
+class VnfController():
"""vrouter controll class"""
logger = logging.getLogger(__name__)
@@ -36,7 +36,7 @@ class VnfController(object):
self.util = Utilvnf()
self.vm_controller = VmController(util_info)
- with open(self.util.test_env_config_yaml) as file_fd:
+ with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd:
test_env_config_yaml = yaml.safe_load(file_fd)
file_fd.close()
@@ -49,10 +49,9 @@ class VnfController(object):
def config_vnf(self, source_vnf, destination_vnf, test_cmd_file_path,
parameter_file_path, prompt_file_path):
# pylint: disable=too-many-arguments
- parameter_file = open(parameter_file_path,
- 'r')
- cmd_input_param = yaml.safe_load(parameter_file)
- parameter_file.close()
+ with open(
+ parameter_file_path, 'r', encoding='utf-8') as parameter_file:
+ cmd_input_param = yaml.safe_load(parameter_file)
cmd_input_param["macaddress"] = source_vnf["data_plane_network_mac"]
cmd_input_param["source_ip"] = source_vnf["data_plane_network_ip"]
@@ -71,19 +70,16 @@ class VnfController(object):
res_dict_data_list = []
- parameter_file = open(parameter_file_path,
- 'r')
- cmd_input_param = yaml.safe_load(parameter_file)
- parameter_file.close()
+ with open(
+ parameter_file_path, 'r', encoding='utf-8') as parameter_file:
+ cmd_input_param = yaml.safe_load(parameter_file)
cmd_input_param["source_ip"] = target_vnf["data_plane_network_ip"]
cmd_input_param["destination_ip"] = reference_vnf[
"data_plane_network_ip"]
- prompt_file = open(prompt_file_path,
- 'r')
- prompt = yaml.safe_load(prompt_file)
- prompt_file.close()
+ with open(prompt_file_path, 'r', encoding='utf-8') as prompt_file:
+ prompt = yaml.safe_load(prompt_file)
terminal_mode_prompt = prompt["terminal_mode"]
ssh = SshClient(target_vnf["floating_ip"],
diff --git a/functest/opnfv_tests/vnf/router/vrouter_base.py b/functest/opnfv_tests/vnf/router/vrouter_base.py
index 6c4e5ce0d..932770b9c 100644
--- a/functest/opnfv_tests/vnf/router/vrouter_base.py
+++ b/functest/opnfv_tests/vnf/router/vrouter_base.py
@@ -19,37 +19,22 @@ import time
import pkg_resources
-import functest.core.vnf as vnf
-from functest.utils import config
from functest.opnfv_tests.vnf.router.test_controller import function_test_exec
-from functest.opnfv_tests.vnf.router.utilvnf import Utilvnf
__author__ = "Shuya Nakama <shuya.nakama@okinawaopenlabs.org>"
-REBOOT_WAIT = 30
-
-class VrouterOnBoardingBase(vnf.VnfOnBoarding):
+class VrouterOnBoardingBase():
"""vrouter testing base class"""
- def __init__(self, **kwargs):
+ def __init__(self, util, util_info):
self.logger = logging.getLogger(__name__)
- super(VrouterOnBoardingBase, self).__init__(**kwargs)
self.case_dir = pkg_resources.resource_filename(
'functest', 'opnfv_tests/vnf/router')
- self.data_dir = getattr(config.CONF, 'dir_router_data')
- self.result_dir = os.path.join(getattr(config.CONF, 'dir_results'),
- self.case_name)
- self.util = Utilvnf()
- self.util_info = {}
-
+ self.util = util
+ self.util_info = util_info
self.vnf_list = []
- if not os.path.exists(self.data_dir):
- os.makedirs(self.data_dir)
- if not os.path.exists(self.result_dir):
- os.makedirs(self.result_dir)
-
def test_vnf(self):
"""vrouter test execution"""
result = False
@@ -89,10 +74,6 @@ class VrouterOnBoardingBase(vnf.VnfOnBoarding):
vnf_info_list = self.get_vnf_info_list(target_vnf_name)
self.vnf_list = vnf_info_list
- self.logger.debug("request vnf's reboot.")
- self.util.request_vnf_reboot(vnf_info_list)
- time.sleep(REBOOT_WAIT)
-
target_vnf = self.util.get_target_vnf(vnf_info_list)
reference_vnf_list = self.util.get_reference_vnf_list(vnf_info_list)
@@ -117,6 +98,7 @@ class VrouterOnBoardingBase(vnf.VnfOnBoarding):
return result, test_result_data
def get_vnf_info_list(self, target_vnf_name):
- # pylint: disable=unused-argument,no-self-use
- vnf_info_list = []
- return vnf_info_list
+ return self.util.get_vnf_info_list(
+ self.util_info["cfy_manager_ip"],
+ self.util_info["deployment_name"],
+ target_vnf_name)