diff options
Diffstat (limited to 'functest/opnfv_tests/vnf')
10 files changed, 106 insertions, 120 deletions
diff --git a/functest/opnfv_tests/vnf/epc/juju_epc.py b/functest/opnfv_tests/vnf/epc/juju_epc.py index 5049bd0bb..1cf240b80 100644 --- a/functest/opnfv_tests/vnf/epc/juju_epc.py +++ b/functest/opnfv_tests/vnf/epc/juju_epc.py @@ -83,16 +83,16 @@ class JujuEpc(singlevm.SingleVm2): def __init__(self, **kwargs): if "case_name" not in kwargs: kwargs["case_name"] = "juju_epc" - super(JujuEpc, self).__init__(**kwargs) + super().__init__(**kwargs) # Retrieve the configuration self.case_dir = pkg_resources.resource_filename( 'functest', 'opnfv_tests/vnf/epc') try: self.config = getattr( - config.CONF, 'vnf_{}_config'.format(self.case_name)) - except Exception: - raise Exception("VNF config file not found") + config.CONF, f'vnf_{self.case_name}_config') + except Exception as exc: + raise Exception("VNF config file not found") from exc self.config_file = os.path.join(self.case_dir, self.config) self.orchestrator = dict( requirements=functest_utils.get_parameter_from_yaml( @@ -138,7 +138,7 @@ class JujuEpc(singlevm.SingleVm2): try: self.public_auth_url = self.get_public_auth_url(self.orig_cloud) if not self.public_auth_url.endswith(('v3', 'v3/')): - self.public_auth_url = "{}/v3".format(self.public_auth_url) + self.public_auth_url = f"{self.public_auth_url}/v3" except Exception: # pylint: disable=broad-except self.public_auth_url = None self.sec = None @@ -168,7 +168,7 @@ class JujuEpc(singlevm.SingleVm2): 'url': self.public_auth_url, 'region': self.cloud.region_name if self.cloud.region_name else ( 'RegionOne')} - with open(clouds_yaml, 'w') as yfile: + with open(clouds_yaml, 'w', encoding='utf-8') as yfile: yfile.write(CLOUD_TEMPLATE.format(**cloud_data)) scpc = scp.SCPClient(self.ssh.get_transport()) scpc.put(clouds_yaml, remote_path='~/') @@ -189,7 +189,7 @@ class JujuEpc(singlevm.SingleVm2): "project_domain_name", "Default"), 'user_domain_n': self.cloud.auth.get( "user_domain_name", "Default")} - with open(credentials_yaml, 'w') as yfile: + with open(credentials_yaml, 'w', encoding='utf-8') as yfile: yfile.write(CREDS_TEMPLATE.format(**creds_data)) scpc = scp.SCPClient(self.ssh.get_transport()) scpc.put(credentials_yaml, remote_path='~/') @@ -205,20 +205,20 @@ class JujuEpc(singlevm.SingleVm2): 'RegionOne') (_, stdout, stderr) = self.ssh.exec_command( '/snap/bin/juju metadata generate-image -d /home/ubuntu ' - '-i {} -s xenial -r {} -u {}'.format( - self.image.id, region_name, self.public_auth_url)) + f'-i {self.image.id} -s xenial -r {region_name} ' + f'-u {self.public_auth_url}') self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8")) self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8")) return not stdout.channel.recv_exit_status() def publish_image_alt(self, name=None): - image_alt = super(JujuEpc, self).publish_image_alt(name) + image_alt = super().publish_image_alt(name) region_name = self.cloud.region_name if self.cloud.region_name else ( 'RegionOne') (_, stdout, stderr) = self.ssh.exec_command( '/snap/bin/juju metadata generate-image -d /home/ubuntu ' - '-i {} -s trusty -r {} -u {}'.format( - image_alt.id, region_name, self.public_auth_url)) + f'-i {image_alt.id} -s trusty -r {region_name} ' + f'-u {self.public_auth_url}') self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8")) self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8")) return image_alt @@ -236,18 +236,16 @@ class JujuEpc(singlevm.SingleVm2): region_name = self.cloud.region_name if self.cloud.region_name else ( 'RegionOne') (_, stdout, stderr) = self.ssh.exec_command( - 'timeout {} ' - '/snap/bin/juju bootstrap abot-epc/{} abot-controller ' + f'timeout {JujuEpc.juju_timeout} ' + f'/snap/bin/juju bootstrap abot-epc/{region_name} abot-controller ' '--agent-version 2.3.9 --metadata-source /home/ubuntu ' '--constraints mem=2G --bootstrap-series xenial ' - '--config network={} ' + f'--config network={self.network.id} ' '--config ssl-hostname-verification=false ' - '--config external-network={} ' + f'--config external-network={self.ext_net.id} ' '--config use-floating-ip=true ' '--config use-default-secgroup=true ' - '--debug'.format( - JujuEpc.juju_timeout, region_name, self.network.id, - self.ext_net.id)) + '--debug') self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8")) self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8")) return not stdout.channel.recv_exit_status() @@ -256,14 +254,14 @@ class JujuEpc(singlevm.SingleVm2): """Check application status.""" for i in range(10): (_, stdout, stderr) = self.ssh.exec_command( - '/snap/bin/juju status --format short {}'.format(name)) + f'/snap/bin/juju status --format short {name}') output = stdout.read().decode("utf-8") self.__logger.debug("stdout:\n%s", output) self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8")) if stdout.channel.recv_exit_status(): continue ret = re.search( - r'(?=workload:({})\))'.format(status), output) + rf'(?=workload:({status})\))', output) if ret: self.__logger.info("%s workload is %s", name, status) break @@ -295,7 +293,7 @@ class JujuEpc(singlevm.SingleVm2): return not stdout.channel.recv_exit_status() (_, stdout, stderr) = self.ssh.exec_command( 'PATH=/snap/bin/:$PATH ' - 'timeout {} juju-wait'.format(JujuEpc.juju_timeout)) + f'timeout {JujuEpc.juju_timeout} juju-wait') self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8")) self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8")) if stdout.channel.recv_exit_status(): @@ -312,12 +310,11 @@ class JujuEpc(singlevm.SingleVm2): return False scpc = scp.SCPClient(self.ssh.get_transport()) scpc.put( - '{}/featureFiles'.format(self.case_dir), remote_path='~/', + f'{self.case_dir}/featureFiles', remote_path='~/', recursive=True) (_, stdout, stderr) = self.ssh.exec_command( - 'timeout {} /snap/bin/juju scp -- -r -v ~/featureFiles ' - 'abot-epc-basic/0:/etc/rebaca-test-suite/'.format( - JujuEpc.juju_timeout)) + f'timeout {JujuEpc.juju_timeout} /snap/bin/juju scp -- -r -v ' + '~/featureFiles abot-epc-basic/0:/etc/rebaca-test-suite/') output = stdout.read().decode("utf-8") self.__logger.debug("stdout:\n%s", output) self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8")) @@ -327,15 +324,15 @@ class JujuEpc(singlevm.SingleVm2): """Run test on ABoT.""" start_time = time.time() (_, stdout, stderr) = self.ssh.exec_command( - '/snap/bin/juju run-action abot-epc-basic/0 ' - 'run tagnames={}'.format(self.details['test_vnf']['tag_name'])) + "/snap/bin/juju run-action abot-epc-basic/0 " + f"run tagnames={self.details['test_vnf']['tag_name']}") self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8")) self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8")) if stdout.channel.recv_exit_status(): return not stdout.channel.recv_exit_status() (_, stdout, stderr) = self.ssh.exec_command( 'PATH=/snap/bin/:$PATH ' - 'timeout {} juju-wait'.format(JujuEpc.juju_timeout)) + f'timeout {JujuEpc.juju_timeout} juju-wait') self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8")) self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8")) if stdout.channel.recv_exit_status(): @@ -343,9 +340,9 @@ class JujuEpc(singlevm.SingleVm2): duration = time.time() - start_time self.__logger.info("Getting results from Abot node....") (_, stdout, stderr) = self.ssh.exec_command( - 'timeout {} /snap/bin/juju scp -- -v abot-epc-basic/0:' - '/var/lib/abot-epc-basic/artifacts/TestResults.json .'.format( - JujuEpc.juju_timeout)) + f'timeout {JujuEpc.juju_timeout} /snap/bin/juju scp ' + '-- -v abot-epc-basic/0:' + '/var/lib/abot-epc-basic/artifacts/TestResults.json .') self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8")) self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8")) if stdout.channel.recv_exit_status(): @@ -353,8 +350,7 @@ class JujuEpc(singlevm.SingleVm2): scpc = scp.SCPClient(self.ssh.get_transport()) scpc.get('TestResults.json', self.res_dir) self.__logger.info("Parsing the Test results...") - res = (process_abot_test_result('{}/TestResults.json'.format( - self.res_dir))) + res = process_abot_test_result(f'{self.res_dir}/TestResults.json') short_result = sig_test_format(res) self.__logger.info(short_result) self.details['test_vnf'].update( @@ -375,7 +371,7 @@ class JujuEpc(singlevm.SingleVm2): except OSError as ex: if ex.errno != errno.EEXIST: self.__logger.exception("Cannot create %s", self.res_dir) - raise Exception + raise Exception from ex self.__logger.info("ENV:\n%s", env.string()) try: assert self._install_juju() @@ -407,7 +403,7 @@ class JujuEpc(singlevm.SingleVm2): self.cloud.delete_image(self.image_alt) if self.flavor_alt: self.orig_cloud.delete_flavor(self.flavor_alt.id) - super(JujuEpc, self).clean() + super().clean() def sig_test_format(sig_test): @@ -433,7 +429,7 @@ def sig_test_format(sig_test): def process_abot_test_result(file_path): """ Process ABoT Result """ - with open(file_path) as test_result: + with open(file_path, encoding='utf-8') as test_result: data = json.load(test_result) res = [] for tests in data: diff --git a/functest/opnfv_tests/vnf/ims/clearwater.py b/functest/opnfv_tests/vnf/ims/clearwater.py index 67128b11c..4c143fd70 100644 --- a/functest/opnfv_tests/vnf/ims/clearwater.py +++ b/functest/opnfv_tests/vnf/ims/clearwater.py @@ -50,7 +50,7 @@ class ClearwaterTesting(): output_dict = {} self.logger.debug('Ellis IP: %s', self.ellis_ip) output_dict['ellis_ip'] = self.ellis_ip - account_url = 'http://{0}/accounts'.format(self.ellis_ip) + account_url = f'http://{self.ellis_ip}/accounts' params = {"password": "functest", "full_name": "opnfv functest user", "email": "functest@opnfv.org", @@ -60,7 +60,7 @@ class ClearwaterTesting(): number_res = self._create_ellis_account(account_url, params) output_dict['number'] = number_res - session_url = 'http://{0}/session'.format(self.ellis_ip) + session_url = f'http://{self.ellis_ip}/session' session_data = { 'username': params['email'], 'password': params['password'], @@ -68,8 +68,8 @@ class ClearwaterTesting(): } cookies = self._get_ellis_session_cookies(session_url, session_data) - number_url = 'http://{0}/accounts/{1}/numbers'.format( - self.ellis_ip, params['email']) + number_url = ( + f"http://{self.ellis_ip}/accounts/{params['email']}/numbers") self.logger.debug('Create 1st calling number on Ellis') number_res = self._create_ellis_number(number_url, cookies) @@ -97,8 +97,7 @@ class ClearwaterTesting(): "try %s: cannot create ellis account", iloop + 1) time.sleep(30) raise Exception( - "Unable to create an account {}".format( - params.get('full_name'))) + f"Unable to create an account {params.get('full_name')}") def _get_ellis_session_cookies(self, session_url, params): i = 15 @@ -150,24 +149,20 @@ class ClearwaterTesting(): """ # pylint: disable=too-many-locals,too-many-arguments self.logger.info('Run Clearwater live test') - script = ('cd {0};' - 'rake test[{1}] SIGNUP_CODE={2}' - .format(self.test_dir, - public_domain, - signup_code)) + script = (f'cd {self.test_dir};' + f'rake test[{public_domain}] SIGNUP_CODE={signup_code}') if self.bono_ip and self.ellis_ip: - subscript = ' PROXY={0} ELLIS={1}'.format( - self.bono_ip, self.ellis_ip) - script = '{0}{1}'.format(script, subscript) - script = ('{0}{1}'.format(script, ' --trace')) - cmd = "/bin/bash -c '{0}'".format(script) + subscript = f' PROXY={self.bono_ip} ELLIS={self.ellis_ip}' + script = f'{script}{subscript}' + script = f'{script} --trace' + cmd = f"/bin/sh -c '{script}'" self.logger.debug('Live test cmd: %s', cmd) output_file = os.path.join(self.result_dir, "ims_test_output.txt") ft_utils.execute_command(cmd, error_msg='Clearwater live test failed', output_file=output_file) - with open(output_file, 'r') as ofile: + with open(output_file, 'r', encoding='utf-8') as ofile: result = ofile.read() if result != "": diff --git a/functest/opnfv_tests/vnf/ims/cloudify_ims.py b/functest/opnfv_tests/vnf/ims/cloudify_ims.py index d937cc052..b93af7d6d 100644 --- a/functest/opnfv_tests/vnf/ims/cloudify_ims.py +++ b/functest/opnfv_tests/vnf/ims/cloudify_ims.py @@ -53,14 +53,14 @@ class CloudifyIms(cloudify.Cloudify): """Initialize CloudifyIms testcase object.""" if "case_name" not in kwargs: kwargs["case_name"] = "cloudify_ims" - super(CloudifyIms, self).__init__(**kwargs) + super().__init__(**kwargs) # Retrieve the configuration try: self.config = getattr( - config.CONF, 'vnf_{}_config'.format(self.case_name)) - except Exception: - raise Exception("VNF config file not found") + config.CONF, f'vnf_{self.case_name}_config') + except Exception as exc: + raise Exception("VNF config file not found") from exc self.case_dir = pkg_resources.resource_filename( 'functest', 'opnfv_tests/vnf/ims') @@ -114,7 +114,7 @@ class CloudifyIms(cloudify.Cloudify): network, security group, fip, VM creation """ - assert super(CloudifyIms, self).execute() == 0 + assert super().execute() == 0 start_time = time.time() self.orig_cloud.set_network_quotas( self.project.project.name, @@ -259,4 +259,4 @@ class CloudifyIms(cloudify.Cloudify): self.cloud.delete_image(self.image_alt) if self.flavor_alt: self.orig_cloud.delete_flavor(self.flavor_alt.id) - super(CloudifyIms, self).clean() + super().clean() diff --git a/functest/opnfv_tests/vnf/ims/heat_ims.py b/functest/opnfv_tests/vnf/ims/heat_ims.py index 4edb2ddd9..0d4e345a0 100644 --- a/functest/opnfv_tests/vnf/ims/heat_ims.py +++ b/functest/opnfv_tests/vnf/ims/heat_ims.py @@ -57,14 +57,14 @@ class HeatIms(singlevm.VmReady2): """Initialize HeatIms testcase object.""" if "case_name" not in kwargs: kwargs["case_name"] = "heat_ims" - super(HeatIms, self).__init__(**kwargs) + super().__init__(**kwargs) # Retrieve the configuration try: self.config = getattr( - config.CONF, 'vnf_{}_config'.format(self.case_name)) - except Exception: - raise Exception("VNF config file not found") + config.CONF, f'vnf_{self.case_name}_config') + except Exception as exc: + raise Exception("VNF config file not found") from exc self.case_dir = pkg_resources.resource_filename( 'functest', 'opnfv_tests/vnf/ims') @@ -112,9 +112,10 @@ class HeatIms(singlevm.VmReady2): project=self.project.project.id, domain=self.project.domain.id) self.keypair = self.cloud.create_keypair( - '{}-kp_{}'.format(self.case_name, self.guid)) + f'{self.case_name}-kp_{self.guid}') self.__logger.info("keypair:\n%s", self.keypair.private_key) - with open(self.key_filename, 'w') as private_key_file: + with open( + self.key_filename, 'w', encoding='utf-8') as private_key_file: private_key_file.write(self.keypair.private_key) if self.deploy_vnf() and self.test_vnf(): @@ -137,7 +138,7 @@ class HeatIms(singlevm.VmReady2): status = testcase.TestCase.EX_RUN_ERROR try: assert self.cloud - assert super(HeatIms, self).run( + assert super().run( **kwargs) == testcase.TestCase.EX_OK self.result = 0 if not self.execute(): @@ -247,6 +248,6 @@ class HeatIms(singlevm.VmReady2): pass except Exception: # pylint: disable=broad-except self.__logger.exception("Cannot clean stack ressources") - super(HeatIms, self).clean() + super().clean() if self.role: self.orig_cloud.delete_role(self.role.id) diff --git a/functest/opnfv_tests/vnf/router/cloudify_vrouter.py b/functest/opnfv_tests/vnf/router/cloudify_vrouter.py index 82f57ca7b..32d675347 100644 --- a/functest/opnfv_tests/vnf/router/cloudify_vrouter.py +++ b/functest/opnfv_tests/vnf/router/cloudify_vrouter.py @@ -51,14 +51,14 @@ class CloudifyVrouter(cloudify.Cloudify): def __init__(self, **kwargs): if "case_name" not in kwargs: kwargs["case_name"] = "vyos_vrouter" - super(CloudifyVrouter, self).__init__(**kwargs) + super().__init__(**kwargs) # Retrieve the configuration try: self.config = getattr( - config.CONF, 'vnf_{}_config'.format(self.case_name)) - except Exception: - raise Exception("VNF config file not found") + config.CONF, f'vnf_{self.case_name}_config') + except Exception as exc: + raise Exception("VNF config file not found") from exc self.case_dir = pkg_resources.resource_filename( 'functest', 'opnfv_tests/vnf/router') @@ -127,7 +127,7 @@ class CloudifyVrouter(cloudify.Cloudify): network, security group, fip, VM creation """ # network creation - super(CloudifyVrouter, self).execute() + super().execute() start_time = time.time() self.put_private_key() self.upload_cfy_plugins(self.cop_yaml, self.cop_wgn) @@ -231,4 +231,4 @@ class CloudifyVrouter(cloudify.Cloudify): self.cloud.delete_image(self.image_alt) if self.flavor_alt: self.orig_cloud.delete_flavor(self.flavor_alt.id) - super(CloudifyVrouter, self).clean() + super().clean() diff --git a/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py b/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py index 0a56913b7..9eb3c5d69 100644 --- a/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py +++ b/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py @@ -32,17 +32,16 @@ class FunctionTestExec(): credentials = util_info["credentials"] self.vnf_ctrl = VnfController(util_info) - test_cmd_map_file = open( - os.path.join( - self.util.vnf_data_dir, self.util.command_template_dir, - self.util.test_cmd_map_yaml_file), - 'r') - self.test_cmd_map_yaml = yaml.safe_load(test_cmd_map_file) - test_cmd_map_file.close() + with open( + os.path.join( + self.util.vnf_data_dir, self.util.command_template_dir, + self.util.test_cmd_map_yaml_file), + 'r', encoding='utf-8') as test_cmd_map_file: + self.test_cmd_map_yaml = yaml.safe_load(test_cmd_map_file) self.util.set_credentials(credentials["cloud"]) - with open(self.util.test_env_config_yaml) as file_fd: + with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd: test_env_config_yaml = yaml.safe_load(file_fd) file_fd.close() diff --git a/functest/opnfv_tests/vnf/router/utilvnf.py b/functest/opnfv_tests/vnf/router/utilvnf.py index 2db3b38e5..111f20c1a 100644 --- a/functest/opnfv_tests/vnf/router/utilvnf.py +++ b/functest/opnfv_tests/vnf/router/utilvnf.py @@ -64,7 +64,7 @@ class Utilvnf(): # pylint: disable=too-many-instance-attributes if not os.path.exists(self.vnf_data_dir): os.makedirs(self.vnf_data_dir) - with open(self.test_env_config_yaml) as file_fd: + with open(self.test_env_config_yaml, encoding='utf-8') as file_fd: test_env_config_yaml = yaml.safe_load(file_fd) file_fd.close() @@ -98,9 +98,7 @@ class Utilvnf(): # pylint: disable=too-many-instance-attributes return mac_address def get_blueprint_outputs(self, cfy_manager_ip, deployment_name): - url = "http://%s/deployments/%s/outputs" % ( - cfy_manager_ip, deployment_name) - + url = f"http://{cfy_manager_ip}/deployments/{deployment_name}/outputs" response = requests.get( url, auth=requests.auth.HTTPBasicAuth('admin', 'admin'), @@ -212,24 +210,29 @@ class Utilvnf(): # pylint: disable=too-many-instance-attributes def write_result_data(self, result_data): test_result = [] if not os.path.isfile(self.test_result_json_file): - file_fd = open(self.test_result_json_file, "w") - file_fd.close() + with open( + self.test_result_json_file, "w", + encoding="utf-8") as file_fd: + pass else: - file_fd = open(self.test_result_json_file, "r") - test_result = json.load(file_fd) - file_fd.close() + with open( + self.test_result_json_file, "r", + encoding="utf-8") as file_fd: + test_result = json.load(file_fd) test_result.append(result_data) - file_fd = open(self.test_result_json_file, "w") - json.dump(test_result, file_fd) - file_fd.close() + with open( + self.test_result_json_file, "w", + encoding="utf-8") as file_fd: + json.dump(test_result, file_fd) def output_test_result_json(self): if os.path.isfile(self.test_result_json_file): - file_fd = open(self.test_result_json_file, "r") - test_result = json.load(file_fd) - file_fd.close() + with open( + self.test_result_json_file, "r", + encoding="utf-8") as file_fd: + test_result = json.load(file_fd) output_json_data = json.dumps(test_result, sort_keys=True, indent=4) @@ -239,8 +242,6 @@ class Utilvnf(): # pylint: disable=too-many-instance-attributes @staticmethod def get_test_scenario(file_path): - test_scenario_file = open(file_path, - 'r') - test_scenario_yaml = yaml.safe_load(test_scenario_file) - test_scenario_file.close() + with open(file_path, "r", encoding="utf-8") as test_scenario_file: + test_scenario_yaml = yaml.safe_load(test_scenario_file) return test_scenario_yaml["test_scenario_list"] diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py b/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py index 0969eab3b..269f6526b 100644 --- a/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py +++ b/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py @@ -43,7 +43,7 @@ class SshClient(): # pylint: disable=too-many-instance-attributes self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.util = Utilvnf() - with open(self.util.test_env_config_yaml) as file_fd: + with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd: test_env_config_yaml = yaml.safe_load(file_fd) file_fd.close() diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py b/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py index b159ddda4..2210b3909 100644 --- a/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py +++ b/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py @@ -36,7 +36,7 @@ class VmController(): self.util = Utilvnf() self.util.set_credentials(credentials["cloud"]) - with open(self.util.test_env_config_yaml) as file_fd: + with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd: test_env_config_yaml = yaml.safe_load(file_fd) file_fd.close() @@ -101,10 +101,8 @@ class VmController(): def command_create_and_execute(self, ssh, test_cmd_file_path, cmd_input_param, prompt_file_path): - prompt_file = open(prompt_file_path, - 'r') - prompt = yaml.safe_load(prompt_file) - prompt_file.close() + with open(prompt_file_path, 'r', encoding='utf-8') as prompt_file: + prompt = yaml.safe_load(prompt_file) config_mode_prompt = prompt["config_mode"] commands = self.command_gen_from_template(test_cmd_file_path, diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py b/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py index 7ed287c6e..46584456f 100644 --- a/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py +++ b/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py @@ -36,7 +36,7 @@ class VnfController(): self.util = Utilvnf() self.vm_controller = VmController(util_info) - with open(self.util.test_env_config_yaml) as file_fd: + with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd: test_env_config_yaml = yaml.safe_load(file_fd) file_fd.close() @@ -49,10 +49,9 @@ class VnfController(): def config_vnf(self, source_vnf, destination_vnf, test_cmd_file_path, parameter_file_path, prompt_file_path): # pylint: disable=too-many-arguments - parameter_file = open(parameter_file_path, - 'r') - cmd_input_param = yaml.safe_load(parameter_file) - parameter_file.close() + with open( + parameter_file_path, 'r', encoding='utf-8') as parameter_file: + cmd_input_param = yaml.safe_load(parameter_file) cmd_input_param["macaddress"] = source_vnf["data_plane_network_mac"] cmd_input_param["source_ip"] = source_vnf["data_plane_network_ip"] @@ -71,19 +70,16 @@ class VnfController(): res_dict_data_list = [] - parameter_file = open(parameter_file_path, - 'r') - cmd_input_param = yaml.safe_load(parameter_file) - parameter_file.close() + with open( + parameter_file_path, 'r', encoding='utf-8') as parameter_file: + cmd_input_param = yaml.safe_load(parameter_file) cmd_input_param["source_ip"] = target_vnf["data_plane_network_ip"] cmd_input_param["destination_ip"] = reference_vnf[ "data_plane_network_ip"] - prompt_file = open(prompt_file_path, - 'r') - prompt = yaml.safe_load(prompt_file) - prompt_file.close() + with open(prompt_file_path, 'r', encoding='utf-8') as prompt_file: + prompt = yaml.safe_load(prompt_file) terminal_mode_prompt = prompt["terminal_mode"] ssh = SshClient(target_vnf["floating_ip"], |