aboutsummaryrefslogtreecommitdiffstats
path: root/functest/opnfv_tests/vnf/epc/juju_epc.py
diff options
context:
space:
mode:
Diffstat (limited to 'functest/opnfv_tests/vnf/epc/juju_epc.py')
-rw-r--r--functest/opnfv_tests/vnf/epc/juju_epc.py704
1 files changed, 268 insertions, 436 deletions
diff --git a/functest/opnfv_tests/vnf/epc/juju_epc.py b/functest/opnfv_tests/vnf/epc/juju_epc.py
index 3f2a9ff93..1cf240b80 100644
--- a/functest/opnfv_tests/vnf/epc/juju_epc.py
+++ b/functest/opnfv_tests/vnf/epc/juju_epc.py
@@ -14,35 +14,16 @@ import os
import time
import json
import re
-import subprocess
import sys
-import uuid
+
from copy import deepcopy
import pkg_resources
-import yaml
-
-import six
-from snaps.config.flavor import FlavorConfig
-from snaps.config.image import ImageConfig
-from snaps.config.network import NetworkConfig, SubnetConfig
-from snaps.config.router import RouterConfig
-from snaps.config.security_group import (
- Direction, Protocol, SecurityGroupConfig, SecurityGroupRuleConfig)
-from snaps.config.user import UserConfig
-from snaps.openstack.create_flavor import OpenStackFlavor
-from snaps.openstack.create_image import OpenStackImage
-from snaps.openstack.create_network import OpenStackNetwork
-from snaps.openstack.create_router import OpenStackRouter
-from snaps.openstack.create_security_group import OpenStackSecurityGroup
-from snaps.openstack.create_user import OpenStackUser
-from snaps.openstack.utils import keystone_utils
-from snaps.openstack.utils import nova_utils
-from snaps.openstack.utils import neutron_utils
-
-from functest.core import vnf
-from functest.opnfv_tests.openstack.snaps import snaps_utils
+import scp
+
+from functest.core import singlevm
from functest.utils import config
from functest.utils import env
+from functest.utils import functest_utils
__author__ = "Amarendra Meher <amarendra@rebaca.com>"
__author__ = "Soumaya K Nayek <soumaya.nayek@rebaca.com>"
@@ -61,188 +42,186 @@ CREDS_TEMPLATE2 = """credentials:
default-credential: abot-epc
abot-epc:
auth-type: userpass
- password: {pass}
+ password: '{pass}'
project-domain-name: {project_domain_n}
tenant-name: {tenant_n}"""
-CREDS_TEMPLATE3 = """credentials:
+CREDS_TEMPLATE = """credentials:
abot-epc:
default-credential: abot-epc
abot-epc:
auth-type: userpass
- password: {pass}
+ password: '{pass}'
project-domain-name: {project_domain_n}
tenant-name: {tenant_n}
user-domain-name: {user_domain_n}
username: {user_n}"""
-class JujuEpc(vnf.VnfOnBoarding):
+class JujuEpc(singlevm.SingleVm2):
# pylint:disable=too-many-instance-attributes
"""Abot EPC deployed with JUJU Orchestrator Case"""
__logger = logging.getLogger(__name__)
- juju_timeout = '3600'
+ cidr = '192.168.120.0/24'
+
+ filename = ('/home/opnfv/functest/images/'
+ 'ubuntu-16.04-server-cloudimg-amd64-disk1.img')
+ filename_alt = ('/home/opnfv/functest/images/'
+ 'ubuntu-14.04-server-cloudimg-amd64-disk1.img')
+
+ flavor_ram = 2048
+ flavor_vcpus = 1
+ flavor_disk = 10
+ flavor_alt_ram = 4096
+ flavor_alt_vcpus = 1
+ flavor_alt_disk = 10
+ username = 'ubuntu'
+ juju_timeout = '4800'
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "juju_epc"
- super(JujuEpc, self).__init__(**kwargs)
+ super().__init__(**kwargs)
# Retrieve the configuration
self.case_dir = pkg_resources.resource_filename(
'functest', 'opnfv_tests/vnf/epc')
try:
self.config = getattr(
- config.CONF, 'vnf_{}_config'.format(self.case_name))
- except Exception:
- raise Exception("VNF config file not found")
+ config.CONF, f'vnf_{self.case_name}_config')
+ except Exception as exc:
+ raise Exception("VNF config file not found") from exc
self.config_file = os.path.join(self.case_dir, self.config)
- self.orchestrator = dict(requirements=get_config(
- "orchestrator.requirements", self.config_file))
+ self.orchestrator = dict(
+ requirements=functest_utils.get_parameter_from_yaml(
+ "orchestrator.requirements", self.config_file))
self.created_object = []
self.details['orchestrator'] = dict(
- name=get_config("orchestrator.name", self.config_file),
- version=get_config("orchestrator.version", self.config_file),
+ name=functest_utils.get_parameter_from_yaml(
+ "orchestrator.name", self.config_file),
+ version=functest_utils.get_parameter_from_yaml(
+ "orchestrator.version", self.config_file),
status='ERROR',
result=''
)
self.vnf = dict(
- descriptor=get_config("vnf.descriptor", self.config_file),
- requirements=get_config("vnf.requirements", self.config_file)
+ descriptor=functest_utils.get_parameter_from_yaml(
+ "vnf.descriptor", self.config_file),
+ requirements=functest_utils.get_parameter_from_yaml(
+ "vnf.requirements", self.config_file)
)
self.details['vnf'] = dict(
descriptor_version=self.vnf['descriptor']['version'],
- name=get_config("vnf.name", self.config_file),
- version=get_config("vnf.version", self.config_file),
+ name=functest_utils.get_parameter_from_yaml(
+ "vnf.name", self.config_file),
+ version=functest_utils.get_parameter_from_yaml(
+ "vnf.version", self.config_file),
)
self.__logger.debug("VNF configuration: %s", self.vnf)
self.details['test_vnf'] = dict(
- name=get_config("vnf_test_suite.name", self.config_file),
- version=get_config("vnf_test_suite.version", self.config_file),
- tag_name=get_config("vnf_test_suite.tag_name", self.config_file)
+ name=functest_utils.get_parameter_from_yaml(
+ "vnf_test_suite.name", self.config_file),
+ version=functest_utils.get_parameter_from_yaml(
+ "vnf_test_suite.version", self.config_file),
+ tag_name=functest_utils.get_parameter_from_yaml(
+ "vnf_test_suite.tag_name", self.config_file)
)
- self.public_auth_url = None
self.res_dir = os.path.join(
getattr(config.CONF, 'dir_results'), self.case_name)
- def _bypass_juju_netdiscovery_bug(self, name):
- user_creator = OpenStackUser(
- self.snaps_creds,
- UserConfig(
- name=name,
- password=str(uuid.uuid4()),
- project_name=self.tenant_name,
- domain_name=self.snaps_creds.user_domain_name,
- roles={'_member_': self.tenant_name}))
- user_creator.create()
- self.created_object.append(user_creator)
- return user_creator
+ try:
+ self.public_auth_url = self.get_public_auth_url(self.orig_cloud)
+ if not self.public_auth_url.endswith(('v3', 'v3/')):
+ self.public_auth_url = f"{self.public_auth_url}/v3"
+ except Exception: # pylint: disable=broad-except
+ self.public_auth_url = None
+ self.sec = None
+ self.image_alt = None
+ self.flavor_alt = None
+
+ def _install_juju(self):
+ (_, stdout, stderr) = self.ssh.exec_command(
+ 'sudo snap install juju --channel=2.3/stable --classic')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
+
+ def _install_juju_wait(self):
+ (_, stdout, stderr) = self.ssh.exec_command(
+ 'sudo apt-get update && sudo apt-get install python3-pip -y && '
+ 'sudo pip3 install juju_wait===2.6.4')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
def _register_cloud(self):
+ assert self.public_auth_url
self.__logger.info("Creating Cloud for Abot-epc .....")
clouds_yaml = os.path.join(self.res_dir, "clouds.yaml")
cloud_data = {
'url': self.public_auth_url,
- 'region': self.snaps_creds.region_name if (
- self.snaps_creds.region_name) else 'RegionOne'}
- with open(clouds_yaml, 'w') as yfile:
+ 'region': self.cloud.region_name if self.cloud.region_name else (
+ 'RegionOne')}
+ with open(clouds_yaml, 'w', encoding='utf-8') as yfile:
yfile.write(CLOUD_TEMPLATE.format(**cloud_data))
- cmd = ['juju', 'add-cloud', 'abot-epc', '-f', clouds_yaml, '--replace']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
-
- def _register_credentials_v2(self):
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.put(clouds_yaml, remote_path='~/')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju add-cloud abot-epc -f clouds.yaml --replace')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
+
+ def _register_credentials(self):
self.__logger.info("Creating Credentials for Abot-epc .....")
- user_creator = self._bypass_juju_netdiscovery_bug(
- 'juju_network_discovery_bug')
- snaps_creds = user_creator.get_os_creds(self.snaps_creds.project_name)
- self.__logger.debug("snaps creds: %s", snaps_creds)
credentials_yaml = os.path.join(self.res_dir, "credentials.yaml")
creds_data = {
- 'pass': snaps_creds.password,
- 'tenant_n': snaps_creds.project_name,
- 'user_n': snaps_creds.username}
- with open(credentials_yaml, 'w') as yfile:
- yfile.write(CREDS_TEMPLATE2.format(**creds_data))
- cmd = ['juju', 'add-credential', 'abot-epc', '-f', credentials_yaml,
- '--replace']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
-
- def _register_credentials_v3(self):
- self.__logger.info("Creating Credentials for Abot-epc .....")
- user_creator = self._bypass_juju_netdiscovery_bug(
- 'juju_network_discovery_bug')
- snaps_creds = user_creator.get_os_creds(self.snaps_creds.project_name)
- self.__logger.debug("snaps creds: %s", snaps_creds)
- credentials_yaml = os.path.join(self.res_dir, "credentials.yaml")
- creds_data = {
- 'pass': snaps_creds.password,
- 'tenant_n': snaps_creds.project_name,
- 'user_n': snaps_creds.username,
- 'project_domain_n': snaps_creds.project_domain_name,
- 'user_domain_n': snaps_creds.user_domain_name}
- with open(credentials_yaml, 'w') as yfile:
- yfile.write(CREDS_TEMPLATE3.format(**creds_data))
- cmd = ['juju', 'add-credential', 'abot-epc', '-f', credentials_yaml,
- '--replace']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
-
- def _add_custom_rule(self, sec_grp_name):
- """ To add custom rule for SCTP Traffic """
-
- security_group = OpenStackSecurityGroup(
- self.snaps_creds,
- SecurityGroupConfig(
- name=sec_grp_name))
-
- security_group.create()
-
- # Add custom security rule to the obtained Security Group
- self.__logger.info("Adding SCTP ingress rule to SG:%s",
- security_group.sec_grp_settings.name)
-
- try:
- security_group.add_rule(SecurityGroupRuleConfig(
- sec_grp_name=sec_grp_name, direction=Direction.ingress,
- protocol=Protocol.sctp))
- except Exception: # pylint: disable=broad-except
- self.__logger.exception(
- "Some issue encountered with adding SCTP security rule ...")
-
- def prepare(self):
- """Prepare testcase (Additional pre-configuration steps)."""
- self.__logger.info("Additional pre-configuration steps")
- super(JujuEpc, self).prepare()
- try:
- os.makedirs(self.res_dir)
- except OSError as ex:
- if ex.errno != errno.EEXIST:
- self.__logger.exception("Cannot create %s", self.res_dir)
- raise vnf.VnfPreparationException
-
- self.__logger.info("ENV:\n%s", env.string())
-
- self.public_auth_url = keystone_utils.get_endpoint(
- self.snaps_creds, 'identity')
-
- # it enforces a versioned public identity endpoint as juju simply
- # adds /auth/tokens wich fails vs an unversioned endpoint.
- if not self.public_auth_url.endswith(('v3', 'v3/', 'v2.0', 'v2.0/')):
- self.public_auth_url = six.moves.urllib.parse.urljoin(
- self.public_auth_url, 'v3')
- self._register_cloud()
- if self.snaps_creds.identity_api_version == 3:
- self._register_credentials_v3()
- else:
- self._register_credentials_v2()
+ 'pass': self.project.password,
+ 'tenant_n': self.project.project.name,
+ 'user_n': self.project.user.name,
+ 'project_domain_n': self.cloud.auth.get(
+ "project_domain_name", "Default"),
+ 'user_domain_n': self.cloud.auth.get(
+ "user_domain_name", "Default")}
+ with open(credentials_yaml, 'w', encoding='utf-8') as yfile:
+ yfile.write(CREDS_TEMPLATE.format(**creds_data))
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.put(credentials_yaml, remote_path='~/')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju add-credential abot-epc -f credentials.yaml '
+ ' --replace --debug')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
+
+ def _publish_image(self):
+ region_name = self.cloud.region_name if self.cloud.region_name else (
+ 'RegionOne')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju metadata generate-image -d /home/ubuntu '
+ f'-i {self.image.id} -s xenial -r {region_name} '
+ f'-u {self.public_auth_url}')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
+
+ def publish_image_alt(self, name=None):
+ image_alt = super().publish_image_alt(name)
+ region_name = self.cloud.region_name if self.cloud.region_name else (
+ 'RegionOne')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju metadata generate-image -d /home/ubuntu '
+ f'-i {image_alt.id} -s trusty -r {region_name} '
+ f'-u {self.public_auth_url}')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return image_alt
def deploy_orchestrator(self): # pylint: disable=too-many-locals
"""
@@ -250,204 +229,128 @@ class JujuEpc(vnf.VnfOnBoarding):
Bootstrap juju
"""
- self.__logger.info("Deploying Juju Orchestrator")
- private_net_name = getattr(
- config.CONF, 'vnf_{}_private_net_name'.format(self.case_name))
- private_subnet_name = '{}-{}'.format(
- getattr(config.CONF,
- 'vnf_{}_private_subnet_name'.format(self.case_name)),
- self.uuid)
- private_subnet_cidr = getattr(
- config.CONF, 'vnf_{}_private_subnet_cidr'.format(self.case_name))
- abot_router = '{}-{}'.format(
- getattr(config.CONF,
- 'vnf_{}_external_router'.format(self.case_name)),
- self.uuid)
- self.__logger.info("Creating full network with nameserver: %s",
- env.get('NAMESERVER'))
- subnet_settings = SubnetConfig(
- name=private_subnet_name,
- cidr=private_subnet_cidr,
- dns_nameservers=[env.get('NAMESERVER')])
- network_settings = NetworkConfig(
- name=private_net_name, subnet_settings=[subnet_settings])
- network_creator = OpenStackNetwork(self.snaps_creds, network_settings)
- net_id = network_creator.create().id
- self.created_object.append(network_creator)
-
- ext_net_name = snaps_utils.get_ext_net_name(self.snaps_creds)
- self.__logger.info("Creating network Router ....")
- router_creator = OpenStackRouter(
- self.snaps_creds, RouterConfig(
- name=abot_router,
- external_gateway=ext_net_name,
- internal_subnets=[subnet_settings.name]))
- router_creator.create()
- self.created_object.append(router_creator)
- self.__logger.info("Creating Flavor ....")
- flavor_settings = FlavorConfig(
- name=self.orchestrator['requirements']['flavor']['name'],
- ram=self.orchestrator['requirements']['flavor']['ram_min'],
- disk=10, vcpus=1)
- flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings)
- flavor_creator.create()
- self.created_object.append(flavor_creator)
-
- self.__logger.info("Upload some OS images if it doesn't exist")
- images = get_config("tenant_images", self.config_file)
- self.__logger.info("Images needed for vEPC: %s", images)
- for image_name, image_file in six.iteritems(images):
- self.__logger.info("image: %s, file: %s", image_name, image_file)
- if image_file and image_name:
- image_creator = OpenStackImage(self.snaps_creds, ImageConfig(
- name=image_name, image_user='cloud', img_format='qcow2',
- image_file=image_file))
- image_id = image_creator.create().id
- cmd = ['juju', 'metadata', 'generate-image', '-d', '/root',
- '-i', image_id, '-s', image_name, '-r',
- self.snaps_creds.region_name if (
- self.snaps_creds.region_name) else 'RegionOne',
- '-u', self.public_auth_url]
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
- self.created_object.append(image_creator)
- self.__logger.info("Network ID : %s", net_id)
-
+ self._publish_image()
+ self.image_alt = self.publish_image_alt()
+ self.flavor_alt = self.create_flavor_alt()
self.__logger.info("Starting Juju Bootstrap process...")
- try:
- cmd = ['timeout', '-t', JujuEpc.juju_timeout,
- 'juju', 'bootstrap', 'abot-epc', 'abot-controller',
- '--metadata-source', '/root',
- '--constraints', 'mem=2G',
- '--bootstrap-series', 'xenial',
- '--config', 'network={}'.format(net_id),
- '--config', 'ssl-hostname-verification=false',
- '--config', 'use-floating-ip=true',
- '--config', 'use-default-secgroup=true',
- '--debug']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
- except subprocess.CalledProcessError as cpe:
- self.__logger.error(
- "Exception with Juju Bootstrap: %s\n%s",
- cpe.cmd, cpe.output)
- return False
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Some issue with Juju Bootstrap ...")
- return False
-
- return True
+ region_name = self.cloud.region_name if self.cloud.region_name else (
+ 'RegionOne')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ f'timeout {JujuEpc.juju_timeout} '
+ f'/snap/bin/juju bootstrap abot-epc/{region_name} abot-controller '
+ '--agent-version 2.3.9 --metadata-source /home/ubuntu '
+ '--constraints mem=2G --bootstrap-series xenial '
+ f'--config network={self.network.id} '
+ '--config ssl-hostname-verification=false '
+ f'--config external-network={self.ext_net.id} '
+ '--config use-floating-ip=true '
+ '--config use-default-secgroup=true '
+ '--debug')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
def check_app(self, name='abot-epc-basic', status='active'):
"""Check application status."""
- cmd = ['juju', 'status', '--format', 'short', name]
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
- ret = re.search(r'(?=workload:({})\))'.format(status), output)
- if ret:
- self.__logger.info("%s workload is %s", name, status)
- return True
- self.__logger.error("%s workload differs from %s", name, status)
- return False
+ for i in range(10):
+ (_, stdout, stderr) = self.ssh.exec_command(
+ f'/snap/bin/juju status --format short {name}')
+ output = stdout.read().decode("utf-8")
+ self.__logger.debug("stdout:\n%s", output)
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ continue
+ ret = re.search(
+ rf'(?=workload:({status})\))', output)
+ if ret:
+ self.__logger.info("%s workload is %s", name, status)
+ break
+ self.__logger.info(
+ "loop %d: %s workload differs from %s", i + 1, name, status)
+ time.sleep(60)
+ else:
+ self.__logger.error("%s workload differs from %s", name, status)
+ return False
+ return True
def deploy_vnf(self):
"""Deploy ABOT-OAI-EPC."""
self.__logger.info("Upload VNFD")
- descriptor = self.vnf['descriptor']
- self.__logger.info("Get or create flavor for all Abot-EPC")
- flavor_settings = FlavorConfig(
- name=self.vnf['requirements']['flavor']['name'],
- ram=self.vnf['requirements']['flavor']['ram_min'],
- disk=10,
- vcpus=1)
- flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings)
- flavor_creator.create()
- self.created_object.append(flavor_creator)
-
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.put(
+ '/src/epc-requirements/abot_charm', remote_path='~/',
+ recursive=True)
self.__logger.info("Deploying Abot-epc bundle file ...")
- cmd = ['juju', 'deploy', '{}'.format(descriptor.get('file_name'))]
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
- self.__logger.info("Waiting for instances .....")
- try:
- cmd = ['timeout', '-t', JujuEpc.juju_timeout, 'juju-wait']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
- self.__logger.info("Deployed Abot-epc on Openstack")
- except subprocess.CalledProcessError as cpe:
- self.__logger.error(
- "Exception with Juju VNF Deployment: %s\n%s",
- cpe.cmd, cpe.output)
- return False
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Some issue with the VNF Deployment ..")
- return False
-
+ (_, stdout, stderr) = self.ssh.exec_command(
+ 'sudo mkdir -p /src/epc-requirements && '
+ 'sudo mv abot_charm /src/epc-requirements/abot_charm && '
+ '/snap/bin/juju deploy '
+ '/src/epc-requirements/abot_charm/functest-abot-epc-bundle/'
+ 'bundle.yaml')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
+ (_, stdout, stderr) = self.ssh.exec_command(
+ 'PATH=/snap/bin/:$PATH '
+ f'timeout {JujuEpc.juju_timeout} juju-wait')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
self.__logger.info("Checking status of ABot and EPC units ...")
- cmd = ['juju', 'status']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.debug("%s\n%s", " ".join(cmd), output)
+ (_, stdout, stderr) = self.ssh.exec_command('/snap/bin/juju status')
+ output = stdout.read().decode("utf-8")
+ self.__logger.debug("stdout:\n%s", output)
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
for app in ['abot-epc-basic', 'oai-epc', 'oai-hss']:
if not self.check_app(app):
return False
-
- nova_client = nova_utils.nova_client(self.snaps_creds)
- instances = get_instances(nova_client)
- self.__logger.info("List of Instance: %s", instances)
- for items in instances:
- metadata = get_instance_metadata(nova_client, items)
- if 'juju-units-deployed' in metadata:
- sec_group = 'juju-{}-{}'.format(
- metadata['juju-controller-uuid'],
- metadata['juju-model-uuid'])
- self.__logger.info("Instance: %s", sec_group)
- break
- self.__logger.info("Adding Security group rule....")
- # This will add sctp rule to a common Security Group Created
- # by juju and shared to all deployed units.
- self._add_custom_rule(sec_group)
-
- self.__logger.info("Transferring the feature files to Abot_node ...")
- cmd = ['timeout', '-t', JujuEpc.juju_timeout,
- 'juju', 'scp', '--', '-r', '-v',
- '{}/featureFiles'.format(self.case_dir), 'abot-epc-basic/0:~/']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
-
- self.__logger.info("Copying the feature files within Abot_node ")
- cmd = ['timeout', '-t', JujuEpc.juju_timeout,
- 'juju', 'ssh', 'abot-epc-basic/0',
- 'sudo', 'cp', '-vfR', '~/featureFiles/*',
- '/etc/rebaca-test-suite/featureFiles']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
- return True
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.put(
+ f'{self.case_dir}/featureFiles', remote_path='~/',
+ recursive=True)
+ (_, stdout, stderr) = self.ssh.exec_command(
+ f'timeout {JujuEpc.juju_timeout} /snap/bin/juju scp -- -r -v '
+ '~/featureFiles abot-epc-basic/0:/etc/rebaca-test-suite/')
+ output = stdout.read().decode("utf-8")
+ self.__logger.debug("stdout:\n%s", output)
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
def test_vnf(self):
"""Run test on ABoT."""
start_time = time.time()
- self.__logger.info("Running VNF Test cases....")
- cmd = ['juju', 'run-action', 'abot-epc-basic/0', 'run',
- 'tagnames={}'.format(self.details['test_vnf']['tag_name'])]
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
-
- cmd = ['timeout', '-t', JujuEpc.juju_timeout, 'juju-wait']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
-
+ (_, stdout, stderr) = self.ssh.exec_command(
+ "/snap/bin/juju run-action abot-epc-basic/0 "
+ f"run tagnames={self.details['test_vnf']['tag_name']}")
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
+ (_, stdout, stderr) = self.ssh.exec_command(
+ 'PATH=/snap/bin/:$PATH '
+ f'timeout {JujuEpc.juju_timeout} juju-wait')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
duration = time.time() - start_time
self.__logger.info("Getting results from Abot node....")
- cmd = ['timeout', '-t', JujuEpc.juju_timeout,
- 'juju', 'scp', '--', '-v',
- 'abot-epc-basic/0:'
- '/var/lib/abot-epc-basic/artifacts/TestResults.json',
- '{}/.'.format(self.res_dir)]
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
+ (_, stdout, stderr) = self.ssh.exec_command(
+ f'timeout {JujuEpc.juju_timeout} /snap/bin/juju scp '
+ '-- -v abot-epc-basic/0:'
+ '/var/lib/abot-epc-basic/artifacts/TestResults.json .')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.get('TestResults.json', self.res_dir)
self.__logger.info("Parsing the Test results...")
- res = (process_abot_test_result('{}/TestResults.json'.format(
- self.res_dir)))
+ res = process_abot_test_result(f'{self.res_dir}/TestResults.json')
short_result = sig_test_format(res)
self.__logger.info(short_result)
self.details['test_vnf'].update(
@@ -459,99 +362,48 @@ class JujuEpc(vnf.VnfOnBoarding):
short_result['failures'], short_result['skipped'])
return True
- def _get_floating_ips(self):
- """Get the list of floating IPs associated with the current project"""
-
- project_id = self.os_project.get_project().id
-
- neutron_client = neutron_utils.neutron_client(self.snaps_creds)
- floating_ips = neutron_utils.get_floating_ips(neutron_client)
-
- project_floating_ip_list = list()
- for floating_ip in floating_ips:
- if project_id and project_id == floating_ip.project_id:
- project_floating_ip_list.append(floating_ip)
-
- return project_floating_ip_list
-
- def _release_floating_ips(self, fip_list):
- """
- Responsible for deleting a list of floating IPs
- :param fip_list: A list of SNAPS FloatingIp objects
- :return:
- """
- if not fip_list:
- return
-
- neutron_client = neutron_utils.neutron_client(self.snaps_creds)
-
- for floating_ip in fip_list:
- neutron_utils.delete_floating_ip(neutron_client, floating_ip)
-
- def clean(self):
- """Clean created objects/functions."""
-
- # Store Floating IPs of instances created by Juju
- fip_list = self._get_floating_ips()
- self.__logger.info("Floating IPs assigned to project:%s",
- self.os_project.get_project().name)
- for floating_ip in fip_list:
- self.__logger.debug("%s:%s", floating_ip.ip,
- floating_ip.description)
-
+ def execute(self):
+ """Prepare testcase (Additional pre-configuration steps)."""
+ assert self.public_auth_url
+ self.__logger.info("Additional pre-configuration steps")
+ try:
+ os.makedirs(self.res_dir)
+ except OSError as ex:
+ if ex.errno != errno.EEXIST:
+ self.__logger.exception("Cannot create %s", self.res_dir)
+ raise Exception from ex
+ self.__logger.info("ENV:\n%s", env.string())
try:
- cmd = ['juju', 'debug-log', '--replay', '--no-tail']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.debug("%s\n%s", " ".join(cmd), output)
- if not self.orchestrator['requirements']['preserve_setup']:
- self.__logger.info("Destroying Orchestrator...")
- cmd = ['timeout', '-t', JujuEpc.juju_timeout,
- 'juju', 'destroy-controller', '-y', 'abot-controller',
- '--destroy-all-models']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output)
- except subprocess.CalledProcessError as cpe:
- self.__logger.error(
- "Exception with Juju Cleanup: %s\n%s",
- cpe.cmd, cpe.output)
+ assert self._install_juju()
+ assert self._install_juju_wait()
+ assert self._register_cloud()
+ assert self._register_credentials()
+ assert self.deploy_orchestrator()
+ assert self.deploy_vnf()
+ assert self.test_vnf()
except Exception: # pylint: disable=broad-except
- self.__logger.exception("General issue during the undeployment ..")
-
- if not self.orchestrator['requirements']['preserve_setup']:
- try:
- self.__logger.info('Release floating IPs assigned by Juju...')
- self._release_floating_ips(fip_list)
- except Exception: # pylint: disable=broad-except
- self.__logger.exception(
- "Exception while releasing floating IPs ...")
+ self.__logger.exception("juju_epc failed")
+ return 1
+ return 0
- self.__logger.info('Remove the Abot_epc OS objects ..')
- super(JujuEpc, self).clean()
-
- return True
-
-
-# ----------------------------------------------------------
-#
-# YAML UTILS
-#
-# -----------------------------------------------------------
-def get_config(parameter, file_path):
- """
- Returns the value of a given parameter in file.yaml
- parameter must be given in string format with dots
- Example: general.openstack.image_name
- """
- with open(file_path) as config_file:
- file_yaml = yaml.safe_load(config_file)
- config_file.close()
- value = file_yaml
- for element in parameter.split("."):
- value = value.get(element)
- if value is None:
- raise ValueError("The parameter %s is not defined in"
- " reporting.yaml" % parameter)
- return value
+ def clean(self):
+ """Clean created objects/functions."""
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju debug-log --replay --no-tail')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju destroy-controller -y abot-controller '
+ '--destroy-all-models')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ for fip in self.cloud.list_floating_ips():
+ self.cloud.delete_floating_ip(fip.id)
+ if self.image_alt:
+ self.cloud.delete_image(self.image_alt)
+ if self.flavor_alt:
+ self.orig_cloud.delete_flavor(self.flavor_alt.id)
+ super().clean()
def sig_test_format(sig_test):
@@ -577,7 +429,7 @@ def sig_test_format(sig_test):
def process_abot_test_result(file_path):
""" Process ABoT Result """
- with open(file_path) as test_result:
+ with open(file_path, encoding='utf-8') as test_result:
data = json.load(test_result)
res = []
for tests in data:
@@ -629,23 +481,3 @@ def update_data(obj):
raise
return obj
-
-
-def get_instances(nova_client):
- """ To get all vm info of a project """
- try:
- instances = nova_client.servers.list()
- return instances
- except Exception as exc: # pylint: disable=broad-except
- logging.error("Error [get_instances(nova_client)]: %s", exc)
- return None
-
-
-def get_instance_metadata(nova_client, instance):
- """ Get instance Metadata - Instance ID """
- try:
- instance = nova_client.servers.get(instance.id)
- return instance.metadata
- except Exception as exc: # pylint: disable=broad-except
- logging.error("Error [get_instance_status(nova_client)]: %s", exc)
- return None