diff options
-rw-r--r-- | functest/opnfv_tests/vnf/epc/juju_epc.py | 74 |
1 files changed, 64 insertions, 10 deletions
diff --git a/functest/opnfv_tests/vnf/epc/juju_epc.py b/functest/opnfv_tests/vnf/epc/juju_epc.py index 75d3157b7..12c6d2398 100644 --- a/functest/opnfv_tests/vnf/epc/juju_epc.py +++ b/functest/opnfv_tests/vnf/epc/juju_epc.py @@ -37,6 +37,7 @@ from snaps.openstack.create_security_group import OpenStackSecurityGroup from snaps.openstack.create_user import OpenStackUser from snaps.openstack.utils import keystone_utils from snaps.openstack.utils import nova_utils +from snaps.openstack.utils import neutron_utils from functest.core import vnf from functest.opnfv_tests.openstack.snaps import snaps_utils @@ -195,18 +196,25 @@ class JujuEpc(vnf.VnfOnBoarding): def _add_custom_rule(self, sec_grp_name): """ To add custom rule for SCTP Traffic """ - sec_grp_rules = list() - sec_grp_rules.append( - SecurityGroupRuleConfig( - sec_grp_name=sec_grp_name, direction=Direction.ingress, - protocol=Protocol.sctp)) + security_group = OpenStackSecurityGroup( self.snaps_creds, SecurityGroupConfig( - name=sec_grp_name, - rule_settings=sec_grp_rules)) + name=sec_grp_name)) + security_group.create() - self.created_object.append(security_group) + + # Add custom security rule to the obtained Security Group + self.__logger.info("Adding SCTP ingress rule to SG:%s", + security_group.sec_grp_settings.name) + + try: + security_group.add_rule(SecurityGroupRuleConfig( + sec_grp_name=sec_grp_name, direction=Direction.ingress, + protocol=Protocol.sctp)) + except Exception: # pylint: disable=broad-except + self.__logger.exception( + "Some issue encountered with adding SCTP security rule ...") def prepare(self): """Prepare testcase (Additional pre-configuration steps).""" @@ -353,6 +361,7 @@ class JujuEpc(vnf.VnfOnBoarding): flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings) flavor_creator.create() self.created_object.append(flavor_creator) + self.__logger.info("Deploying Abot-epc bundle file ...") cmd = ['juju', 'deploy', '{}'.format(descriptor.get('file_name'))] output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) @@ -448,8 +457,46 @@ class JujuEpc(vnf.VnfOnBoarding): short_result['failures'], short_result['skipped']) return True + def _get_floating_ips(self): + """Get the list of floating IPs associated with the current project""" + + project_id = self.os_project.get_project().id + + neutron_client = neutron_utils.neutron_client(self.snaps_creds) + floating_ips = neutron_utils.get_floating_ips(neutron_client) + + project_floating_ip_list = list() + for floating_ip in floating_ips: + if project_id and project_id == floating_ip.project_id: + project_floating_ip_list.append(floating_ip) + + return project_floating_ip_list + + def _release_floating_ips(self, fip_list): + """ + Responsible for deleting a list of floating IPs + :param fip_list: A list of SNAPS FloatingIp objects + :return: + """ + if not fip_list: + return + + neutron_client = neutron_utils.neutron_client(self.snaps_creds) + + for floating_ip in fip_list: + neutron_utils.delete_floating_ip(neutron_client, floating_ip) + def clean(self): """Clean created objects/functions.""" + + # Store Floating IPs of instances created by Juju + fip_list = self._get_floating_ips() + self.__logger.info("Floating IPs assigned to project:%s", + self.os_project.get_project().name) + for floating_ip in fip_list: + self.__logger.debug("%s:%s", floating_ip.ip, + floating_ip.description) + try: cmd = ['juju', 'debug-log', '--replay', '--no-tail'] output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) @@ -469,7 +516,14 @@ class JujuEpc(vnf.VnfOnBoarding): self.__logger.exception("General issue during the undeployment ..") if not self.orchestrator['requirements']['preserve_setup']: - self.__logger.info('Remove the Abot_epc OS object ..') + try: + self.__logger.info('Release floating IPs assigned by Juju...') + self._release_floating_ips(fip_list) + except Exception: # pylint: disable=broad-except + self.__logger.exception( + "Exception while releasing floating IPs ...") + + self.__logger.info('Remove the Abot_epc OS objects ..') super(JujuEpc, self).clean() return True @@ -531,7 +585,7 @@ def process_abot_test_result(file_path): for steps in flatten_steps: steps['result'] = steps['step_status'] res.append(steps) - except: + except Exception: # pylint: disable=broad-except logging.error("Could not post data to ElasticSearch host") raise return res |