summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--functest/opnfv_tests/vnf/epc/juju_epc.py74
1 files changed, 64 insertions, 10 deletions
diff --git a/functest/opnfv_tests/vnf/epc/juju_epc.py b/functest/opnfv_tests/vnf/epc/juju_epc.py
index 75d3157b7..12c6d2398 100644
--- a/functest/opnfv_tests/vnf/epc/juju_epc.py
+++ b/functest/opnfv_tests/vnf/epc/juju_epc.py
@@ -37,6 +37,7 @@ from snaps.openstack.create_security_group import OpenStackSecurityGroup
from snaps.openstack.create_user import OpenStackUser
from snaps.openstack.utils import keystone_utils
from snaps.openstack.utils import nova_utils
+from snaps.openstack.utils import neutron_utils
from functest.core import vnf
from functest.opnfv_tests.openstack.snaps import snaps_utils
@@ -195,18 +196,25 @@ class JujuEpc(vnf.VnfOnBoarding):
def _add_custom_rule(self, sec_grp_name):
""" To add custom rule for SCTP Traffic """
- sec_grp_rules = list()
- sec_grp_rules.append(
- SecurityGroupRuleConfig(
- sec_grp_name=sec_grp_name, direction=Direction.ingress,
- protocol=Protocol.sctp))
+
security_group = OpenStackSecurityGroup(
self.snaps_creds,
SecurityGroupConfig(
- name=sec_grp_name,
- rule_settings=sec_grp_rules))
+ name=sec_grp_name))
+
security_group.create()
- self.created_object.append(security_group)
+
+ # Add custom security rule to the obtained Security Group
+ self.__logger.info("Adding SCTP ingress rule to SG:%s",
+ security_group.sec_grp_settings.name)
+
+ try:
+ security_group.add_rule(SecurityGroupRuleConfig(
+ sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.sctp))
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception(
+ "Some issue encountered with adding SCTP security rule ...")
def prepare(self):
"""Prepare testcase (Additional pre-configuration steps)."""
@@ -353,6 +361,7 @@ class JujuEpc(vnf.VnfOnBoarding):
flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings)
flavor_creator.create()
self.created_object.append(flavor_creator)
+
self.__logger.info("Deploying Abot-epc bundle file ...")
cmd = ['juju', 'deploy', '{}'.format(descriptor.get('file_name'))]
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
@@ -448,8 +457,46 @@ class JujuEpc(vnf.VnfOnBoarding):
short_result['failures'], short_result['skipped'])
return True
+ def _get_floating_ips(self):
+ """Get the list of floating IPs associated with the current project"""
+
+ project_id = self.os_project.get_project().id
+
+ neutron_client = neutron_utils.neutron_client(self.snaps_creds)
+ floating_ips = neutron_utils.get_floating_ips(neutron_client)
+
+ project_floating_ip_list = list()
+ for floating_ip in floating_ips:
+ if project_id and project_id == floating_ip.project_id:
+ project_floating_ip_list.append(floating_ip)
+
+ return project_floating_ip_list
+
+ def _release_floating_ips(self, fip_list):
+ """
+ Responsible for deleting a list of floating IPs
+ :param fip_list: A list of SNAPS FloatingIp objects
+ :return:
+ """
+ if not fip_list:
+ return
+
+ neutron_client = neutron_utils.neutron_client(self.snaps_creds)
+
+ for floating_ip in fip_list:
+ neutron_utils.delete_floating_ip(neutron_client, floating_ip)
+
def clean(self):
"""Clean created objects/functions."""
+
+ # Store Floating IPs of instances created by Juju
+ fip_list = self._get_floating_ips()
+ self.__logger.info("Floating IPs assigned to project:%s",
+ self.os_project.get_project().name)
+ for floating_ip in fip_list:
+ self.__logger.debug("%s:%s", floating_ip.ip,
+ floating_ip.description)
+
try:
cmd = ['juju', 'debug-log', '--replay', '--no-tail']
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
@@ -469,7 +516,14 @@ class JujuEpc(vnf.VnfOnBoarding):
self.__logger.exception("General issue during the undeployment ..")
if not self.orchestrator['requirements']['preserve_setup']:
- self.__logger.info('Remove the Abot_epc OS object ..')
+ try:
+ self.__logger.info('Release floating IPs assigned by Juju...')
+ self._release_floating_ips(fip_list)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception(
+ "Exception while releasing floating IPs ...")
+
+ self.__logger.info('Remove the Abot_epc OS objects ..')
super(JujuEpc, self).clean()
return True
@@ -531,7 +585,7 @@ def process_abot_test_result(file_path):
for steps in flatten_steps:
steps['result'] = steps['step_status']
res.append(steps)
- except:
+ except Exception: # pylint: disable=broad-except
logging.error("Could not post data to ElasticSearch host")
raise
return res