diff options
author | Cédric Ollivier <cedric.ollivier@orange.com> | 2018-02-27 14:25:49 +0100 |
---|---|---|
committer | Cédric Ollivier <cedric.ollivier@orange.com> | 2018-02-27 14:27:57 +0100 |
commit | baa8f2d5f67d45e5761f92cb93fe22050f08d0fe (patch) | |
tree | 05ddb33dc893cad35369b3286db944eac79ffe4d /functest/opnfv_tests/vnf/router | |
parent | 53cd7f8176c996014decb7311d9f546f6b8f2497 (diff) |
Clean all OpenStack related modules
Xtesting is only focused on the framework and entry points.
Change-Id: I1a4146ed8519438b13810a20ddf1140c35bb6ecd
Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
Diffstat (limited to 'functest/opnfv_tests/vnf/router')
14 files changed, 0 insertions, 1663 deletions
diff --git a/functest/opnfv_tests/vnf/router/__init__.py b/functest/opnfv_tests/vnf/router/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/functest/opnfv_tests/vnf/router/__init__.py +++ /dev/null diff --git a/functest/opnfv_tests/vnf/router/cloudify_vrouter.py b/functest/opnfv_tests/vnf/router/cloudify_vrouter.py deleted file mode 100644 index 18acce6f..00000000 --- a/functest/opnfv_tests/vnf/router/cloudify_vrouter.py +++ /dev/null @@ -1,499 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Okinawa Open Laboratory and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -"""vrouter testcase implementation.""" - -import logging -import os -import time -import uuid - -from cloudify_rest_client import CloudifyClient -from cloudify_rest_client.executions import Execution -from scp import SCPClient - -from functest.opnfv_tests.openstack.snaps import snaps_utils -import functest.opnfv_tests.vnf.router.vrouter_base as vrouter_base -from functest.opnfv_tests.vnf.router.utilvnf import Utilvnf -from functest.utils import config -from functest.utils import functest_utils - -from git import Repo - -from snaps.config.flavor import FlavorConfig -from snaps.config.image import ImageConfig -from snaps.config.keypair import KeypairConfig -from snaps.config.network import NetworkConfig, PortConfig, SubnetConfig -from snaps.config.router import RouterConfig -from snaps.config.security_group import ( - Direction, Protocol, SecurityGroupConfig, SecurityGroupRuleConfig) -from snaps.config.user import UserConfig -from snaps.config.vm_inst import FloatingIpConfig, VmInstanceConfig - -from snaps.openstack.create_flavor import OpenStackFlavor -from snaps.openstack.create_image import OpenStackImage -from snaps.openstack.create_instance import OpenStackVmInstance -from snaps.openstack.create_keypairs import OpenStackKeypair -from snaps.openstack.create_network import OpenStackNetwork -from snaps.openstack.create_security_group import OpenStackSecurityGroup -from snaps.openstack.create_router import OpenStackRouter -from snaps.openstack.create_user import OpenStackUser - -import snaps.openstack.utils.glance_utils as glance_utils -from snaps.openstack.utils import keystone_utils - - -__author__ = "Shuya Nakama <shuya.nakama@okinawaopenlabs.org>" - - -class CloudifyVrouter(vrouter_base.VrouterOnBoardingBase): - # pylint: disable=too-many-instance-attributes - """vrouter testcase deployed with Cloudify Orchestrator.""" - - __logger = logging.getLogger(__name__) - name = __name__ - - def __init__(self, **kwargs): - if "case_name" not in kwargs: - kwargs["case_name"] = "vyos_vrouter" - super(CloudifyVrouter, self).__init__(**kwargs) - - # Retrieve the configuration - try: - self.config = getattr( - config.CONF, 'vnf_{}_config'.format(self.case_name)) - except Exception: - raise Exception("VNF config file not found") - - self.cfy_manager_ip = '' - self.util_info = {} - self.deployment_name = '' - - config_file = os.path.join(self.case_dir, self.config) - self.orchestrator = dict( - requirements=functest_utils.get_parameter_from_yaml( - "orchestrator.requirements", config_file), - ) - self.details['orchestrator'] = dict( - name=functest_utils.get_parameter_from_yaml( - "orchestrator.name", config_file), - version=functest_utils.get_parameter_from_yaml( - "orchestrator.version", config_file), - status='ERROR', - result='' - ) - self.__logger.debug("Orchestrator configuration %s", self.orchestrator) - self.__logger.debug("name = %s", self.name) - self.vnf = dict( - descriptor=functest_utils.get_parameter_from_yaml( - "vnf.descriptor", config_file), - inputs=functest_utils.get_parameter_from_yaml( - "vnf.inputs", config_file), - requirements=functest_utils.get_parameter_from_yaml( - "vnf.requirements", config_file) - ) - self.details['vnf'] = dict( - descriptor_version=self.vnf['descriptor']['version'], - name=functest_utils.get_parameter_from_yaml( - "vnf.name", config_file), - version=functest_utils.get_parameter_from_yaml( - "vnf.version", config_file), - ) - self.__logger.debug("VNF configuration: %s", self.vnf) - - self.util = Utilvnf() - - self.details['test_vnf'] = dict( - name=functest_utils.get_parameter_from_yaml( - "vnf_test_suite.name", config_file), - version=functest_utils.get_parameter_from_yaml( - "vnf_test_suite.version", config_file) - ) - self.images = functest_utils.get_parameter_from_yaml( - "tenant_images", config_file) - self.__logger.info("Images needed for vrouter: %s", self.images) - - @staticmethod - def run_blocking_ssh_command(ssh, cmd, - error_msg="Unable to run this command"): - """Command to run ssh command with the exit status.""" - (_, stdout, stderr) = ssh.exec_command(cmd) - CloudifyVrouter.__logger.debug("SSH %s stdout: %s", cmd, stdout.read()) - if stdout.channel.recv_exit_status() != 0: - CloudifyVrouter.__logger.error( - "SSH %s stderr: %s", cmd, stderr.read()) - raise Exception(error_msg) - - def prepare(self): - super(CloudifyVrouter, self).prepare() - self.__logger.info("Additional pre-configuration steps") - self.util.set_credentials(self.snaps_creds) - self.__logger.info("Upload some OS images if it doesn't exist") - for image_name, image_file in self.images.iteritems(): - self.__logger.info("image: %s, file: %s", image_name, image_file) - if image_file and image_name: - image_creator = OpenStackImage( - self.snaps_creds, - ImageConfig( - name=image_name, image_user='cloud', - img_format='qcow2', image_file=image_file)) - image_creator.create() - self.created_object.append(image_creator) - - def deploy_orchestrator(self): - # pylint: disable=too-many-locals,too-many-statements - """ - Deploy Cloudify Manager. - network, security group, fip, VM creation - """ - # network creation - start_time = time.time() - self.__logger.info("Creating keypair ...") - kp_file = os.path.join(self.data_dir, "cloudify_vrouter.pem") - keypair_settings = KeypairConfig( - name='cloudify_vrouter_kp-{}'.format(self.uuid), - private_filepath=kp_file) - keypair_creator = OpenStackKeypair(self.snaps_creds, keypair_settings) - keypair_creator.create() - self.created_object.append(keypair_creator) - - self.__logger.info("Creating full network ...") - subnet_settings = SubnetConfig( - name='cloudify_vrouter_subnet-{}'.format(self.uuid), - cidr='10.67.79.0/24') - network_settings = NetworkConfig( - name='cloudify_vrouter_network-{}'.format(self.uuid), - subnet_settings=[subnet_settings]) - network_creator = OpenStackNetwork(self.snaps_creds, network_settings) - network_creator.create() - self.created_object.append(network_creator) - ext_net_name = snaps_utils.get_ext_net_name(self.snaps_creds) - router_creator = OpenStackRouter( - self.snaps_creds, - RouterConfig( - name='cloudify_vrouter_router-{}'.format(self.uuid), - external_gateway=ext_net_name, - internal_subnets=[subnet_settings.name])) - router_creator.create() - self.created_object.append(router_creator) - - # security group creation - self.__logger.info("Creating security group for cloudify manager vm") - sg_rules = list() - sg_rules.append( - SecurityGroupRuleConfig( - sec_grp_name="sg-cloudify-manager-{}".format(self.uuid), - direction=Direction.ingress, - protocol=Protocol.tcp, port_range_min=1, - port_range_max=65535)) - sg_rules.append( - SecurityGroupRuleConfig( - sec_grp_name="sg-cloudify-manager-{}".format(self.uuid), - direction=Direction.ingress, - protocol=Protocol.udp, port_range_min=1, - port_range_max=65535)) - - security_group_creator = OpenStackSecurityGroup( - self.snaps_creds, - SecurityGroupConfig( - name="sg-cloudify-manager-{}".format(self.uuid), - rule_settings=sg_rules)) - - security_group_creator.create() - self.created_object.append(security_group_creator) - - # orchestrator VM flavor - self.__logger.info("Get or create flavor for cloudify manager vm ...") - - flavor_settings = FlavorConfig( - name=self.orchestrator['requirements']['flavor']['name'], - ram=self.orchestrator['requirements']['flavor']['ram_min'], - disk=50, vcpus=2) - flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings) - flavor_creator.create() - self.created_object.append(flavor_creator) - image_settings = ImageConfig( - name=self.orchestrator['requirements']['os_image'], - image_user='centos', exists=True) - - port_settings = PortConfig( - name='cloudify_manager_port-{}'.format(self.uuid), - network_name=network_settings.name) - - manager_settings = VmInstanceConfig( - name='cloudify_manager-{}'.format(self.uuid), - flavor=flavor_settings.name, - port_settings=[port_settings], - security_group_names=[ - security_group_creator.sec_grp_settings.name], - floating_ip_settings=[FloatingIpConfig( - name='cloudify_manager_fip-{}'.format(self.uuid), - port_name=port_settings.name, - router_name=router_creator.router_settings.name)]) - - manager_creator = OpenStackVmInstance( - self.snaps_creds, manager_settings, image_settings, - keypair_settings) - - self.__logger.info("Creating cloudify manager VM") - manager_creator.create() - self.created_object.append(manager_creator) - - cfy_client = CloudifyClient( - host=manager_creator.get_floating_ip().ip, - username='admin', password='admin', tenant='default_tenant') - - self.orchestrator['object'] = cfy_client - - self.cfy_manager_ip = manager_creator.get_floating_ip().ip - - self.__logger.info("Attemps running status of the Manager") - cfy_status = None - retry = 10 - while str(cfy_status) != 'running' and retry: - try: - cfy_status = cfy_client.manager.get_status()['status'] - self.__logger.debug("The current manager status is %s", - cfy_status) - except Exception: # pylint: disable=broad-except - self.__logger.exception( - "Cloudify Manager isn't up and running. Retrying ...") - retry = retry - 1 - time.sleep(30) - - if str(cfy_status) == 'running': - self.__logger.info("Cloudify Manager is up and running") - else: - raise Exception("Cloudify Manager isn't up and running") - - duration = time.time() - start_time - - self.__logger.info("Put private keypair in manager") - if manager_creator.vm_ssh_active(block=True): - ssh = manager_creator.ssh_client() - scp = SCPClient(ssh.get_transport(), socket_timeout=15.0) - scp.put(kp_file, '~/') - cmd = "sudo cp ~/cloudify_vrouter.pem /etc/cloudify/" - self.run_blocking_ssh_command(ssh, cmd) - cmd = "sudo chmod 444 /etc/cloudify/cloudify_vrouter.pem" - self.run_blocking_ssh_command(ssh, cmd) - cmd = "sudo yum install -y gcc python-devel" - self.run_blocking_ssh_command( - ssh, cmd, "Unable to install packages on manager") - - self.details['orchestrator'].update(status='PASS', duration=duration) - - self.vnf['inputs'].update(dict(external_network_name=ext_net_name)) - - return True - - def deploy_vnf(self): - start_time = time.time() - - self.__logger.info("Upload VNFD") - cfy_client = self.orchestrator['object'] - descriptor = self.vnf['descriptor'] - self.deployment_name = descriptor.get('name') - - vrouter_blueprint_dir = os.path.join( - self.data_dir, self.util.blueprint_dir) - if not os.path.exists(vrouter_blueprint_dir): - Repo.clone_from( - descriptor.get('url'), vrouter_blueprint_dir, - branch=descriptor.get('version')) - - cfy_client.blueprints.upload( - vrouter_blueprint_dir + self.util.blueprint_file_name, - descriptor.get('name')) - - self.__logger.info("Get or create flavor for vrouter") - flavor_settings = FlavorConfig( - name=self.vnf['requirements']['flavor']['name'], - ram=self.vnf['requirements']['flavor']['ram_min'], - disk=25, vcpus=1) - flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings) - flavor = flavor_creator.create() - self.created_object.append(flavor_creator) - - # set image name - glance = glance_utils.glance_client(self.snaps_creds) - image = glance_utils.get_image(glance, "vyos1.1.7") - - user_creator = OpenStackUser( - self.snaps_creds, - UserConfig( - name='cloudify_network_bug-{}'.format(self.uuid), - password=str(uuid.uuid4()), - roles={'_member_': self.tenant_name})) - user_creator.create() - self.created_object.append(user_creator) - snaps_creds = user_creator.get_os_creds(self.snaps_creds.project_name) - - self.vnf['inputs'].update(dict(target_vnf_image_id=image.id)) - self.vnf['inputs'].update(dict(reference_vnf_image_id=image.id)) - self.vnf['inputs'].update(dict(target_vnf_flavor_id=flavor.id)) - self.vnf['inputs'].update(dict(reference_vnf_flavor_id=flavor.id)) - self.vnf['inputs'].update(dict( - keystone_username=snaps_creds.username)) - self.vnf['inputs'].update(dict( - keystone_password=snaps_creds.password)) - self.vnf['inputs'].update(dict( - keystone_tenant_name=snaps_creds.project_name)) - self.vnf['inputs'].update(dict( - region=snaps_creds.region_name)) - self.vnf['inputs'].update(dict( - keystone_url=keystone_utils.get_endpoint( - snaps_creds, 'identity'))) - - self.__logger.info("Create VNF Instance") - cfy_client.deployments.create( - descriptor.get('name'), descriptor.get('name'), - self.vnf.get('inputs')) - - wait_for_execution( - cfy_client, get_execution_id(cfy_client, descriptor.get('name')), - self.__logger, timeout=7200) - - self.__logger.info("Start the VNF Instance deployment") - execution = cfy_client.executions.start(descriptor.get('name'), - 'install') - # Show execution log - execution = wait_for_execution(cfy_client, execution, self.__logger) - - duration = time.time() - start_time - - self.__logger.info(execution) - if execution.status == 'terminated': - self.details['vnf'].update(status='PASS', duration=duration) - result = True - else: - self.details['vnf'].update(status='FAIL', duration=duration) - result = False - return result - - def test_vnf(self): - cfy_client = self.orchestrator['object'] - credentials = {"snaps_creds": self.snaps_creds, - "username": self.snaps_creds.username, - "password": self.snaps_creds.password, - "auth_url": self.snaps_creds.auth_url, - "tenant_name": self.snaps_creds.project_name} - - self.util_info = {"credentials": credentials, - "cfy": cfy_client, - "vnf_data_dir": self.util.vnf_data_dir} - - start_time = time.time() - - result, test_result_data = super(CloudifyVrouter, self).test_vnf() - - duration = time.time() - start_time - - if result: - self.details['test_vnf'].update( - status='PASS', result='OK', full_result=test_result_data, - duration=duration) - else: - self.details['test_vnf'].update( - status='FAIL', result='NG', full_result=test_result_data, - duration=duration) - - return True - - def clean(self): - try: - cfy_client = self.orchestrator['object'] - dep_name = self.vnf['descriptor'].get('name') - # kill existing execution - self.__logger.info('Deleting the current deployment') - exec_list = cfy_client.executions.list(dep_name) - for execution in exec_list: - if execution['status'] == "started": - try: - cfy_client.executions.cancel( - execution['id'], force=True) - except Exception: # pylint: disable=broad-except - self.__logger.warn("Can't cancel the current exec") - - execution = cfy_client.executions.start( - dep_name, 'uninstall', parameters=dict(ignore_failure=True)) - - wait_for_execution(cfy_client, execution, self.__logger) - cfy_client.deployments.delete(self.vnf['descriptor'].get('name')) - cfy_client.blueprints.delete(self.vnf['descriptor'].get('name')) - except Exception: # pylint: disable=broad-except - self.__logger.warn("Some issue during the undeployment ..") - self.__logger.warn("Tenant clean continue ..") - super(CloudifyVrouter, self).clean() - - def get_vnf_info_list(self, target_vnf_name): - return self.util.get_vnf_info_list( - self.cfy_manager_ip, self.deployment_name, target_vnf_name) - - -def wait_for_execution(client, execution, logger, timeout=7200, ): - """Wait for a workflow execution on Cloudify Manager.""" - # if execution already ended - return without waiting - if execution.status in Execution.END_STATES: - return execution - - if timeout is not None: - deadline = time.time() + timeout - - # Poll for execution status and execution logs, until execution ends - # and we receive an event of type in WORKFLOW_END_TYPES - offset = 0 - batch_size = 50 - event_list = [] - execution_ended = False - while True: - event_list = client.events.list( - execution_id=execution.id, _offset=offset, _size=batch_size, - include_logs=False, sort='@timestamp').items - - offset = offset + len(event_list) - for event in event_list: - logger.debug(event.get('message')) - - if timeout is not None: - if time.time() > deadline: - raise RuntimeError( - 'execution of operation {0} for deployment {1} ' - 'timed out'.format(execution.workflow_id, - execution.deployment_id)) - else: - # update the remaining timeout - timeout = deadline - time.time() - - if not execution_ended: - execution = client.executions.get(execution.id) - execution_ended = execution.status in Execution.END_STATES - - if execution_ended: - break - - time.sleep(5) - - return execution - - -def get_execution_id(client, deployment_id): - """ - Get the execution id of a env preparation. - network, security group, fip, VM creation - """ - executions = client.executions.list(deployment_id=deployment_id) - for execution in executions: - if execution.workflow_id == 'create_deployment_environment': - return execution - raise RuntimeError('Failed to get create_deployment_environment ' - 'workflow execution.' - 'Available executions: {0}'.format(executions)) diff --git a/functest/opnfv_tests/vnf/router/cloudify_vrouter.yaml b/functest/opnfv_tests/vnf/router/cloudify_vrouter.yaml deleted file mode 100644 index 58bdb66a..00000000 --- a/functest/opnfv_tests/vnf/router/cloudify_vrouter.yaml +++ /dev/null @@ -1,33 +0,0 @@ ---- -tenant_images: - cloudify_manager_4.0: - /home/opnfv/functest/images/cloudify-manager-premium-4.0.1.qcow2 - vyos1.1.7: /home/opnfv/functest/images/vyos-1.1.7.img -test_data: - url: 'https://github.com/oolorg/opnfv-vnf-data.git' - branch: 'master' -orchestrator: - name: cloudify - version: '4.0' - requirements: - flavor: - name: m1.medium - ram_min: 4096 - os_image: 'cloudify_manager_4.0' -vnf: - name: vyos1.1.7 - version: '1.1.7' - descriptor: - url: https://github.com/oolorg/opnfv-vnf-vyos-blueprint/ - name: vrouter-opnfv - version: 'master' - requirements: - flavor: - name: m1.medium - ram_min: 2048 - inputs: - external_network_name: admin_floating_net - region: RegionOne -vnf_test_suite: - name: vyos-vrouter-test - version: "1.0" diff --git a/functest/opnfv_tests/vnf/router/test_controller/__init__.py b/functest/opnfv_tests/vnf/router/test_controller/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/functest/opnfv_tests/vnf/router/test_controller/__init__.py +++ /dev/null diff --git a/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py b/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py deleted file mode 100644 index be7bee88..00000000 --- a/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Okinawa Open Laboratory and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -"""vrouter function test execution module""" - -import logging -import time -import yaml - -from functest.opnfv_tests.vnf.router.utilvnf import Utilvnf -from functest.opnfv_tests.vnf.router.vnf_controller.vnf_controller import ( - VnfController) - - -class FunctionTestExec(object): - """vrouter function test execution class""" - - logger = logging.getLogger(__name__) - - def __init__(self, util_info): - self.logger.debug("init test exec") - self.util = Utilvnf() - credentials = util_info["credentials"] - self.vnf_ctrl = VnfController(util_info) - - test_cmd_map_file = open(self.util.vnf_data_dir + - self.util.opnfv_vnf_data_dir + - self.util.command_template_dir + - self.util.test_cmd_map_yaml_file, - 'r') - self.test_cmd_map_yaml = yaml.safe_load(test_cmd_map_file) - test_cmd_map_file.close() - - self.util.set_credentials(credentials["snaps_creds"]) - - with open(self.util.test_env_config_yaml) as file_fd: - test_env_config_yaml = yaml.safe_load(file_fd) - file_fd.close() - - self.protocol_stable_wait = test_env_config_yaml.get("general").get( - "protocol_stable_wait") - - def config_target_vnf(self, target_vnf, reference_vnf, test_kind): - self.logger.debug("Configuration to target vnf") - test_info = self.test_cmd_map_yaml[target_vnf["os_type"]] - test_cmd_file_path = test_info[test_kind]["pre_command_target"] - target_parameter_file_path = test_info[test_kind]["parameter_target"] - prompt_file_path = test_info["prompt"] - - return self.vnf_ctrl.config_vnf(target_vnf, - reference_vnf, - test_cmd_file_path, - target_parameter_file_path, - prompt_file_path) - - def config_reference_vnf(self, target_vnf, reference_vnf, test_kind): - self.logger.debug("Configuration to reference vnf") - test_info = self.test_cmd_map_yaml[reference_vnf["os_type"]] - test_cmd_file_path = test_info[test_kind]["pre_command_reference"] - reference_parameter_file_path = test_info[test_kind][ - "parameter_reference"] - prompt_file_path = test_info["prompt"] - - return self.vnf_ctrl.config_vnf(reference_vnf, - target_vnf, - test_cmd_file_path, - reference_parameter_file_path, - prompt_file_path) - - def result_check(self, target_vnf, reference_vnf, test_kind, test_list): - test_info = self.test_cmd_map_yaml[target_vnf["os_type"]] - target_parameter_file_path = test_info[test_kind]["parameter_target"] - prompt_file_path = test_info["prompt"] - check_rule_file_path_list = [] - - for test in test_list: - check_rule_file_path_list.append(test_info[test_kind][test]) - - return self.vnf_ctrl.result_check(target_vnf, - reference_vnf, - check_rule_file_path_list, - target_parameter_file_path, - prompt_file_path) - - def run(self, target_vnf, reference_vnf_list, test_info, test_list): - test_result_data = {} - test_kind = test_info["protocol"] - for reference_vnf in reference_vnf_list: - self.logger.debug("Start config command " + - target_vnf["vnf_name"] + " and " + - reference_vnf["vnf_name"]) - - result = self.config_target_vnf(target_vnf, - reference_vnf, - test_kind) - if not result: - return False, test_result_data - - result = self.config_reference_vnf(target_vnf, - reference_vnf, - test_kind) - if not result: - return False, test_result_data - - self.logger.debug("Finish config command.") - - self.logger.debug("Waiting for protocol stable.") - time.sleep(self.protocol_stable_wait) - - self.logger.debug("Start check method") - - (result, res_dict_data_list) = self.result_check(target_vnf, - reference_vnf, - test_kind, - test_list) - - test_result_data = {"test_kind": test_info["test_kind"], - "protocol": test_info["protocol"], - "result": res_dict_data_list} - - if not result: - self.logger.debug("Error check method.") - return False, test_result_data - - self.logger.debug("Finish check method.") - - return True, test_result_data diff --git a/functest/opnfv_tests/vnf/router/test_scenario.yaml b/functest/opnfv_tests/vnf/router/test_scenario.yaml deleted file mode 100644 index 8bdaf06a..00000000 --- a/functest/opnfv_tests/vnf/router/test_scenario.yaml +++ /dev/null @@ -1,28 +0,0 @@ ---- -test_scenario_list: - - - test_type: 'function_test' - vnf_list: - - - vnf_name: 'target_vnf' - os_type: 'vyos' - image_name: 'vyos1.1.7' - flavor_name: 'm1.medium' - - - vnf_name: 'reference_vnf' - os_type: 'vyos' - image_name: 'vyos1.1.7' - flavor_name: 'm1.medium' - function_test_list: - - - target_vnf_name: 'target_vnf' - test_list: - - - test_kind: 'Interoperability' - protocol: 'BGP' - BGP: - - 'Checking_the_peer_of_BGP' - - 'Checking_the_status_of_BGP' - - 'Checking_the_advertised_routes' - - 'Checking_the_received_routes' - - 'Checking_the_routing_table' diff --git a/functest/opnfv_tests/vnf/router/utilvnf.py b/functest/opnfv_tests/vnf/router/utilvnf.py deleted file mode 100644 index 6861b386..00000000 --- a/functest/opnfv_tests/vnf/router/utilvnf.py +++ /dev/null @@ -1,323 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Okinawa Open Laboratory and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -""" Utility module of vrouter testcase """ - -import json -import logging -import os -import pkg_resources -import requests -import yaml - -from functest.utils import config -from git import Repo -from requests.auth import HTTPBasicAuth -from snaps.openstack.utils import nova_utils - - -RESULT_SPRIT_INDEX = { - "transfer": 8, - "bandwidth": 6, - "jitter": 4, - "los_total": 2, - "pkt_loss": 1 -} - -BIT_PER_BYTE = 8 - -NOVA_CLIENT_API_VERSION = '2' -NOVA_CILENT_NETWORK_INFO_INDEX = 0 -CFY_INFO_OUTPUT_FILE = "output.txt" - -CIDR_NETWORK_SEGMENT_INFO_INDEX = 0 -PACKET_LOST_INFO_INDEX = 0 -PACKET_TOTAL_INFO_INDEX = 1 - -NUMBER_OF_DIGITS_FOR_AVG_TRANSFER = 0 -NUMBER_OF_DIGITS_FOR_AVG_BANDWIDTH = 0 -NUMBER_OF_DIGITS_FOR_AVG_JITTER = 3 -NUMBER_OF_DIGITS_FOR_AVG_PKT_LOSS = 1 - - -class Utilvnf(object): # pylint: disable=too-many-instance-attributes - """ Utility class of vrouter testcase """ - - logger = logging.getLogger(__name__) - - def __init__(self): - self.snaps_creds = "" - self.vnf_data_dir = getattr(config.CONF, 'dir_router_data') - self.opnfv_vnf_data_dir = "opnfv-vnf-data/" - self.command_template_dir = "command_template/" - self.test_scenario_yaml = "test_scenario.yaml" - test_env_config_yaml_file = "test_env_config.yaml" - self.test_cmd_map_yaml_file = "test_cmd_map.yaml" - self.test_env_config_yaml = os.path.join( - self.vnf_data_dir, - self.opnfv_vnf_data_dir, - test_env_config_yaml_file) - - self.blueprint_dir = "opnfv-vnf-vyos-blueprint/" - self.blueprint_file_name = "function-test-openstack-blueprint.yaml" - - if not os.path.exists(self.vnf_data_dir): - os.makedirs(self.vnf_data_dir) - - case_dir = pkg_resources.resource_filename( - 'functest', 'opnfv_tests/vnf/router') - - config_file_name = getattr( - config.CONF, 'vnf_{}_config'.format("vyos_vrouter")) - - config_file = os.path.join(case_dir, config_file_name) - - with open(config_file) as file_fd: - vrouter_config_yaml = yaml.safe_load(file_fd) - file_fd.close() - - test_data = vrouter_config_yaml.get("test_data") - - self.logger.debug("Downloading the test data.") - vrouter_data_path = self.vnf_data_dir + self.opnfv_vnf_data_dir - - if not os.path.exists(vrouter_data_path): - Repo.clone_from(test_data['url'], - vrouter_data_path, - branch=test_data['branch']) - - with open(self.test_env_config_yaml) as file_fd: - test_env_config_yaml = yaml.safe_load(file_fd) - file_fd.close() - - self.image = test_env_config_yaml.get( - "general").get("images").get("vyos") - self.tester_image = test_env_config_yaml.get( - "general").get("images").get("tester_vm_os") - - self.test_result_json_file = "test_result.json" - if os.path.isfile(self.test_result_json_file): - os.remove(self.test_result_json_file) - self.logger.debug("removed %s", self.test_result_json_file) - - def get_nova_client(self): - nova_client = nova_utils.nova_client(self.snaps_creds) - - return nova_client - - def set_credentials(self, snaps_creds): - self.snaps_creds = snaps_creds - - def get_address(self, server_name, network_name): - nova_client = self.get_nova_client() - servers_list = nova_client.servers.list() - server = None - - for server in servers_list: - if server.name == server_name: - break - - address = server.addresses[ - network_name][NOVA_CILENT_NETWORK_INFO_INDEX]["addr"] - - return address - - def get_mac_address(self, server_name, network_name): - nova_client = self.get_nova_client() - servers_list = nova_client.servers.list() - server = None - - for server in servers_list: - if server.name == server_name: - break - - mac_address = server.addresses[network_name][ - NOVA_CILENT_NETWORK_INFO_INDEX]["OS-EXT-IPS-MAC:mac_addr"] - - return mac_address - - def reboot_vm(self, server_name): - nova_client = self.get_nova_client() - servers_list = nova_client.servers.list() - server = None - - for server in servers_list: - if server.name == server_name: - break - - server.reboot() - - return - - def delete_vm(self, server_name): - nova_client = self.get_nova_client() - servers_list = nova_client.servers.list() - server = None - - for server in servers_list: - if server.name == server_name: - nova_client.servers.delete(server) - break - - return - - def get_blueprint_outputs(self, cfy_manager_ip, deployment_name): - url = "http://%s/deployments/%s/outputs" % ( - cfy_manager_ip, deployment_name) - - response = requests.get( - url, - auth=HTTPBasicAuth('admin', 'admin'), - headers={'Tenant': 'default_tenant'}) - - resp_data = response.json() - self.logger.debug(resp_data) - data = resp_data["outputs"] - return data - - def get_blueprint_outputs_vnfs(self, cfy_manager_ip, deployment_name): - outputs = self.get_blueprint_outputs(cfy_manager_ip, - deployment_name) - vnfs = outputs["vnfs"] - vnf_list = [] - for vnf_name in vnfs: - vnf_list.append(vnfs[vnf_name]) - return vnf_list - - def get_blueprint_outputs_networks(self, cfy_manager_ip, deployment_name): - outputs = self.get_blueprint_outputs(cfy_manager_ip, - deployment_name) - networks = outputs["networks"] - network_list = [] - for network_name in networks: - network_list.append(networks[network_name]) - return network_list - - def request_vnf_reboot(self, vnf_info_list): - for vnf in vnf_info_list: - self.logger.debug("reboot the " + vnf["vnf_name"]) - self.reboot_vm(vnf["vnf_name"]) - - def request_vm_delete(self, vnf_info_list): - for vnf in vnf_info_list: - self.logger.debug("delete the " + vnf["vnf_name"]) - self.delete_vm(vnf["vnf_name"]) - - def get_vnf_info_list(self, cfy_manager_ip, topology_deploy_name, - target_vnf_name): - network_list = self.get_blueprint_outputs_networks( - cfy_manager_ip, - topology_deploy_name) - vnf_info_list = self.get_blueprint_outputs_vnfs(cfy_manager_ip, - topology_deploy_name) - for vnf in vnf_info_list: - vnf_name = vnf["vnf_name"] - vnf["os_type"] = self.image["os_type"] - vnf["user"] = self.image["user"] - vnf["pass"] = self.image["pass"] - - vnf["target_vnf_flag"] = bool(vnf_name == target_vnf_name) - - self.logger.debug("vnf name : " + vnf_name) - self.logger.debug(vnf_name + " floating ip address : " + - vnf["floating_ip"]) - - for network in network_list: - network_name = network["network_name"] - ip_address = self.get_address(vnf["vnf_name"], - network["network_name"]) - vnf[network_name + "_ip"] = ip_address - mac = self.get_mac_address(vnf["vnf_name"], - network["network_name"]) - vnf[network_name + "_mac"] = mac - - self.logger.debug(network_name + "_ip of " + vnf["vnf_name"] + - " : " + vnf[network_name + "_ip"]) - self.logger.debug(network_name + "_mac of " + vnf["vnf_name"] + - " : " + vnf[network_name + "_mac"]) - - return vnf_info_list - - @staticmethod - def get_target_vnf(vnf_info_list): - for vnf in vnf_info_list: - if vnf["target_vnf_flag"]: - return vnf - - return None - - @staticmethod - def get_reference_vnf_list(vnf_info_list): - reference_vnf_list = [] - for vnf in vnf_info_list: - if not vnf["target_vnf_flag"]: - reference_vnf_list.append(vnf) - - return reference_vnf_list - - @staticmethod - def get_vnf_info(vnf_info_list, vnf_name): - for vnf in vnf_info_list: - if vnf["vnf_name"] == vnf_name: - return vnf - - return None - - @staticmethod - def convert_functional_test_result(result_data_list): - result = {} - for result_data in result_data_list: - test_kind = result_data["test_kind"] - protocol = result_data["protocol"] - test_result_data = result_data["result"] - - if test_kind not in result: - result[test_kind] = [] - - result[test_kind].append({protocol: test_result_data}) - - return {"Functional_test": result} - - def write_result_data(self, result_data): - test_result = [] - if not os.path.isfile(self.test_result_json_file): - file_fd = open(self.test_result_json_file, "w") - file_fd.close() - else: - file_fd = open(self.test_result_json_file, "r") - test_result = json.load(file_fd) - file_fd.close() - - test_result.append(result_data) - - file_fd = open(self.test_result_json_file, "w") - json.dump(test_result, file_fd) - file_fd.close() - - def output_test_result_json(self): - if os.path.isfile(self.test_result_json_file): - file_fd = open(self.test_result_json_file, "r") - test_result = json.load(file_fd) - file_fd.close() - output_json_data = json.dumps(test_result, - sort_keys=True, - indent=4) - self.logger.debug("test_result %s", output_json_data) - else: - self.logger.debug("Not found %s", self.test_result_json_file) - - @staticmethod - def get_test_scenario(file_path): - test_scenario_file = open(file_path, - 'r') - test_scenario_yaml = yaml.safe_load(test_scenario_file) - test_scenario_file.close() - return test_scenario_yaml["test_scenario_list"] diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/__init__.py b/functest/opnfv_tests/vnf/router/vnf_controller/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/functest/opnfv_tests/vnf/router/vnf_controller/__init__.py +++ /dev/null diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/checker.py b/functest/opnfv_tests/vnf/router/vnf_controller/checker.py deleted file mode 100644 index a7a70f6d..00000000 --- a/functest/opnfv_tests/vnf/router/vnf_controller/checker.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Okinawa Open Laboratory and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -"""vrouter test result check module""" - -import json -import logging -import re - -from jinja2 import Environment, FileSystemLoader - - -class Checker(object): - """vrouter test result check class""" - - logger = logging.getLogger(__name__) - - def __init__(self): - self.logger.debug("init checker") - - @staticmethod - def load_check_rule(rule_file_dir, rule_file_name, parameter): - loader = FileSystemLoader(rule_file_dir, - encoding='utf8') - env = Environment(loader=loader) - check_rule_template = env.get_template(rule_file_name) - check_rule = check_rule_template.render(parameter) - check_rule_data = json.loads(check_rule) - return check_rule_data - - @staticmethod - def regexp_information(response, rules): - status = False - result_data = {} - - for rule in rules["rules"]: - result_data = { - "test_name": rule["description"], - "result": "NG" - } - - match = re.search(rule["regexp"], - response) - rule["response"] = response - if match is None: - status = False - break - - if not match.group(1) == rule["result"]: - status = False - else: - result_data["result"] = "OK" - status = True - - return status, result_data diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/command_generator.py b/functest/opnfv_tests/vnf/router/vnf_controller/command_generator.py deleted file mode 100644 index 7d9116bc..00000000 --- a/functest/opnfv_tests/vnf/router/vnf_controller/command_generator.py +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Okinawa Open Laboratory and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -"""command generator module for vrouter testing""" - -import logging -from jinja2 import Environment, FileSystemLoader - - -class CommandGenerator(object): - """command generator class for vrouter testing""" - - logger = logging.getLogger(__name__) - - def __init__(self): - self.logger.debug("init command generator") - - @staticmethod - def load_template(template_dir, template): - # pylint disable=missing-docstring - loader = FileSystemLoader(template_dir, - encoding='utf8') - env = Environment(loader=loader) - return env.get_template(template) - - @staticmethod - def command_create(template, parameter): - # pylint disable=missing-docstring - commands = template.render(parameter) - return commands.split('\n') diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py b/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py deleted file mode 100644 index 628afd30..00000000 --- a/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Okinawa Open Laboratory and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""ssh client module for vrouter testing""" - -import logging -import time -import yaml - -import paramiko - -from functest.opnfv_tests.vnf.router.utilvnf import Utilvnf - -RECEIVE_ROOP_WAIT = 1 - -DEFAULT_CONNECT_TIMEOUT = 10 -DEFAULT_CONNECT_RETRY_COUNT = 10 -DEFAULT_SEND_TIMEOUT = 10 - - -class SshClient(object): # pylint: disable=too-many-instance-attributes - """ssh client class for vrouter testing""" - - logger = logging.getLogger(__name__) - - def __init__(self, ip_address, user, password=None, key_filename=None): - self.ip_address = ip_address - self.user = user - self.password = password - self.key_filename = key_filename - self.connected = False - self.shell = None - - self.logger.setLevel(logging.INFO) - - self.ssh = paramiko.SSHClient() - self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - - self.util = Utilvnf() - with open(self.util.test_env_config_yaml) as file_fd: - test_env_config_yaml = yaml.safe_load(file_fd) - file_fd.close() - - self.ssh_revieve_buff = test_env_config_yaml.get("general").get( - "ssh_receive_buffer") - - def connect(self, time_out=DEFAULT_CONNECT_TIMEOUT, - retrycount=DEFAULT_CONNECT_RETRY_COUNT): - # pylint: disable=missing-docstring - while retrycount > 0: - try: - self.logger.info("SSH connect to %s.", self.ip_address) - self.ssh.connect(self.ip_address, - username=self.user, - password=self.password, - key_filename=self.key_filename, - timeout=time_out, - look_for_keys=False, - allow_agent=False) - - self.logger.info("SSH connection established to %s.", - self.ip_address) - - self.shell = self.ssh.invoke_shell() - - while not self.shell.recv_ready(): - time.sleep(RECEIVE_ROOP_WAIT) - - self.shell.recv(self.ssh_revieve_buff) - break - except Exception: # pylint: disable=broad-except - self.logger.info("SSH timeout for %s...", self.ip_address) - time.sleep(time_out) - retrycount -= 1 - - if retrycount == 0: - self.logger.error("Cannot establish connection to IP '%s'. " + - "Aborting", - self.ip_address) - self.connected = False - return self.connected - - self.connected = True - return self.connected - - def send(self, cmd, prompt, timeout=DEFAULT_SEND_TIMEOUT): - # pylint: disable=missing-docstring - if self.connected is True: - self.shell.settimeout(timeout) - self.logger.debug("Commandset : '%s'", cmd) - - try: - self.shell.send(cmd + '\n') - except Exception: # pylint: disable=broad-except - self.logger.error("ssh send timeout : Command : '%s'", cmd) - return None - - res_buff = '' - while not res_buff.endswith(prompt): - time.sleep(RECEIVE_ROOP_WAIT) - try: - res = self.shell.recv(self.ssh_revieve_buff) - except Exception: # pylint: disable=broad-except - self.logger.error("ssh receive timeout : Command : '%s'", - cmd) - break - - res_buff += res - - self.logger.debug("Response : '%s'", res_buff) - return res_buff - self.logger.error("Cannot connected to IP '%s'.", self.ip_address) - return None - - def close(self): - # pylint: disable=missing-docstring - if self.connected is True: - self.ssh.close() - - @staticmethod - def error_check(response, err_strs=None): - # pylint: disable=missing-docstring - if err_strs is None: - err_strs = ["error", "warn", "unknown command", "already exist"] - for err in err_strs: - if err in response: - return False - - return True diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py b/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py deleted file mode 100644 index 10e48645..00000000 --- a/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py +++ /dev/null @@ -1,146 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Okinawa Open Laboratory and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -"""vm controll module""" - -import logging -import os -import time -import yaml - -from functest.opnfv_tests.vnf.router.utilvnf import Utilvnf -from functest.opnfv_tests.vnf.router.vnf_controller.command_generator import ( - CommandGenerator) -from functest.opnfv_tests.vnf.router.vnf_controller.ssh_client import ( - SshClient) - - -class VmController(object): - """vm controll class""" - - logger = logging.getLogger(__name__) - - def __init__(self, util_info): - self.logger.debug("initialize vm controller") - self.command_gen = CommandGenerator() - credentials = util_info["credentials"] - - self.util = Utilvnf() - self.util.set_credentials(credentials["snaps_creds"]) - - with open(self.util.test_env_config_yaml) as file_fd: - test_env_config_yaml = yaml.safe_load(file_fd) - file_fd.close() - - self.reboot_wait = test_env_config_yaml.get("general").get( - "reboot_wait") - self.command_wait = test_env_config_yaml.get("general").get( - "command_wait") - self.ssh_connect_timeout = test_env_config_yaml.get("general").get( - "ssh_connect_timeout") - self.ssh_connect_retry_count = test_env_config_yaml.get("general").get( - "ssh_connect_retry_count") - - def command_gen_from_template(self, command_file_path, cmd_input_param): - (command_file_dir, command_file_name) = os.path.split( - command_file_path) - template = self.command_gen.load_template(command_file_dir, - command_file_name) - return self.command_gen.command_create(template, - cmd_input_param) - - def config_vm(self, vm_info, test_cmd_file_path, - cmd_input_param, prompt_file_path): - ssh = self.connect_ssh_and_config_vm(vm_info, - test_cmd_file_path, - cmd_input_param, - prompt_file_path) - if ssh is None: - return False - - ssh.close() - - return True - - def connect_ssh_and_config_vm(self, vm_info, test_cmd_file_path, - cmd_input_param, prompt_file_path): - - key_filename = None - if "key_path" in vm_info: - key_filename = vm_info["key_path"] - - ssh = SshClient(ip_address=vm_info["floating_ip"], - user=vm_info["user"], - password=vm_info["pass"], - key_filename=key_filename) - - result = ssh.connect(self.ssh_connect_timeout, - self.ssh_connect_retry_count) - if not result: - self.logger.debug("try to vm reboot.") - self.util.reboot_vm(vm_info["vnf_name"]) - time.sleep(self.reboot_wait) - result = ssh.connect(self.ssh_connect_timeout, - self.ssh_connect_retry_count) - if not result: - return None - - (result, _) = self.command_create_and_execute( - ssh, - test_cmd_file_path, - cmd_input_param, - prompt_file_path) - if not result: - ssh.close() - return None - - return ssh - - def command_create_and_execute(self, ssh, test_cmd_file_path, - cmd_input_param, prompt_file_path): - prompt_file = open(prompt_file_path, - 'r') - prompt = yaml.safe_load(prompt_file) - prompt_file.close() - config_mode_prompt = prompt["config_mode"] - - commands = self.command_gen_from_template(test_cmd_file_path, - cmd_input_param) - return self.command_list_execute(ssh, - commands, - config_mode_prompt) - - def command_list_execute(self, ssh, command_list, prompt): - res_data_list = [] - for command in command_list: - self.logger.debug("Command : " + command) - (res, res_data) = self.command_execute(ssh, - command, - prompt) - self.logger.debug("Response : " + res_data) - res_data_list.append(res_data) - if not res: - return res, res_data_list - - time.sleep(self.command_wait) - - return True, res_data_list - - def command_execute(self, ssh, command, prompt): - res_data = ssh.send(command, prompt) - if res_data is None: - self.logger.info("retry send command : " + command) - res_data = ssh.send(command, - prompt) - if not ssh.error_check(res_data): - return False, res_data - - return True, res_data diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py b/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py deleted file mode 100644 index a5b1ad85..00000000 --- a/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py +++ /dev/null @@ -1,141 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Okinawa Open Laboratory and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -"""vrouter controll module""" - -import logging -import os -import time -import yaml - -import prettytable - -from functest.opnfv_tests.vnf.router.utilvnf import Utilvnf -from functest.opnfv_tests.vnf.router.vnf_controller.checker import Checker -from functest.opnfv_tests.vnf.router.vnf_controller.ssh_client import ( - SshClient) -from functest.opnfv_tests.vnf.router.vnf_controller.vm_controller import ( - VmController) - - -class VnfController(object): - """vrouter controll class""" - - logger = logging.getLogger(__name__) - - def __init__(self, util_info): - self.logger.debug("init vnf controller") - self.util = Utilvnf() - self.vm_controller = VmController(util_info) - - with open(self.util.test_env_config_yaml) as file_fd: - test_env_config_yaml = yaml.safe_load(file_fd) - file_fd.close() - - self.cmd_wait = test_env_config_yaml.get("general").get("command_wait") - self.ssh_connect_timeout = test_env_config_yaml.get("general").get( - "ssh_connect_timeout") - self.ssh_connect_retry_count = test_env_config_yaml.get("general").get( - "ssh_connect_retry_count") - - def config_vnf(self, source_vnf, destination_vnf, test_cmd_file_path, - parameter_file_path, prompt_file_path): - # pylint: disable=too-many-arguments - parameter_file = open(parameter_file_path, - 'r') - cmd_input_param = yaml.safe_load(parameter_file) - parameter_file.close() - - cmd_input_param["macaddress"] = source_vnf["data_plane_network_mac"] - cmd_input_param["source_ip"] = source_vnf["data_plane_network_ip"] - cmd_input_param["destination_ip"] = destination_vnf[ - "data_plane_network_ip"] - - return self.vm_controller.config_vm(source_vnf, - test_cmd_file_path, - cmd_input_param, - prompt_file_path) - - def result_check(self, target_vnf, reference_vnf, - check_rule_file_path_list, parameter_file_path, - prompt_file_path): - # pylint: disable=too-many-arguments,too-many-locals - - res_dict_data_list = [] - - parameter_file = open(parameter_file_path, - 'r') - cmd_input_param = yaml.safe_load(parameter_file) - parameter_file.close() - - cmd_input_param["source_ip"] = target_vnf["data_plane_network_ip"] - cmd_input_param["destination_ip"] = reference_vnf[ - "data_plane_network_ip"] - - prompt_file = open(prompt_file_path, - 'r') - prompt = yaml.safe_load(prompt_file) - prompt_file.close() - terminal_mode_prompt = prompt["terminal_mode"] - - ssh = SshClient(target_vnf["floating_ip"], - target_vnf["user"], - target_vnf["pass"]) - - result = ssh.connect(self.ssh_connect_timeout, - self.ssh_connect_retry_count) - if not result: - return False, res_dict_data_list - - checker = Checker() - - res_table = prettytable.PrettyTable( - header_style='upper', padding_width=5, - field_names=['test item', 'result']) - - status = True - res_data_list = [] - for check_rule_file_path in check_rule_file_path_list: - (check_rule_dir, check_rule_file) = os.path.split( - check_rule_file_path) - check_rules = checker.load_check_rule(check_rule_dir, - check_rule_file, - cmd_input_param) - (res, res_data) = self.vm_controller.command_execute( - ssh, - check_rules["command"], - terminal_mode_prompt) - res_data_list.append(res_data) - if not res: - status = False - break - - (res, res_dict_data) = checker.regexp_information(res_data, - check_rules) - res_dict_data_list.append(res_dict_data) - res_table.add_row([res_dict_data["test_name"], - res_dict_data["result"]]) - if not res: - status = False - - time.sleep(self.cmd_wait) - - ssh.close() - - self.logger.info("Test result:\n\n%s\n", res_table.get_string()) - - self.output_check_result_detail_data(res_data_list) - - return status, res_dict_data_list - - def output_check_result_detail_data(self, res_data_list): - for res_data in res_data_list: - self.logger.debug(res_data) diff --git a/functest/opnfv_tests/vnf/router/vrouter_base.py b/functest/opnfv_tests/vnf/router/vrouter_base.py deleted file mode 100644 index 8818032d..00000000 --- a/functest/opnfv_tests/vnf/router/vrouter_base.py +++ /dev/null @@ -1,122 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Okinawa Open Laboratory and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -"""vrouter testing base class module""" - -import datetime -import json -import logging -import os -import time - -import pkg_resources - -import functest.core.vnf as vnf -from functest.utils import config -from functest.opnfv_tests.vnf.router.test_controller import function_test_exec -from functest.opnfv_tests.vnf.router.utilvnf import Utilvnf - -__author__ = "Shuya Nakama <shuya.nakama@okinawaopenlabs.org>" - -REBOOT_WAIT = 30 - - -class VrouterOnBoardingBase(vnf.VnfOnBoarding): - """vrouter testing base class""" - - def __init__(self, **kwargs): - self.logger = logging.getLogger(__name__) - super(VrouterOnBoardingBase, self).__init__(**kwargs) - self.case_dir = pkg_resources.resource_filename( - 'functest', 'opnfv_tests/vnf/router') - self.data_dir = getattr(config.CONF, 'dir_router_data') - self.result_dir = os.path.join(getattr(config.CONF, 'dir_results'), - self.case_name) - self.util = Utilvnf() - self.util_info = {} - - self.vnf_list = [] - - if not os.path.exists(self.data_dir): - os.makedirs(self.data_dir) - if not os.path.exists(self.result_dir): - os.makedirs(self.result_dir) - - def test_vnf(self): - """vrouter test execution""" - result = False - test_result_data_list = [] - test_scenario_file_path = os.path.join(self.case_dir, - self.util.test_scenario_yaml) - test_scenario_list = self.util.get_test_scenario( - test_scenario_file_path) - for test_scenario in test_scenario_list: - if test_scenario["test_type"] == "function_test": - function_test_list = test_scenario["function_test_list"] - for function_test in function_test_list: - test_list = function_test["test_list"] - target_vnf_name = function_test["target_vnf_name"] - for test_info in test_list: - self.logger.info(test_info["protocol"] + " " + - test_info["test_kind"] + - " test.") - (result, result_data) = self.function_test_vrouter( - target_vnf_name, test_info) - test_result_data_list.append(result_data) - if not result: - break - - self.util.request_vm_delete(self.vnf_list) - - test_result_data = json.dumps(test_result_data_list, indent=4) - - return result, test_result_data - - def function_test_vrouter(self, target_vnf_name, test_info): - """function test execution""" - - test_protocol = test_info["protocol"] - test_list = test_info[test_protocol] - - vnf_info_list = self.get_vnf_info_list(target_vnf_name) - self.vnf_list = vnf_info_list - - self.logger.debug("request vnf's reboot.") - self.util.request_vnf_reboot(vnf_info_list) - time.sleep(REBOOT_WAIT) - - target_vnf = self.util.get_target_vnf(vnf_info_list) - - reference_vnf_list = self.util.get_reference_vnf_list(vnf_info_list) - - test_exec = function_test_exec.FunctionTestExec(self.util_info) - - # start test - start_time_ts = time.time() - self.logger.info("vRouter test Start Time:'%s'", ( - datetime.datetime.fromtimestamp(start_time_ts).strftime( - '%Y-%m-%d %H:%M:%S'))) - - (result, test_result_data) = test_exec.run(target_vnf, - reference_vnf_list, - test_info, - test_list) - - end_time_ts = time.time() - duration = round(end_time_ts - start_time_ts, 1) - self.logger.info("vRouter test duration :'%s'", duration) - - return result, test_result_data - - def get_vnf_info_list(self, target_vnf_name): - # pylint: disable=unused-argument,no-self-use - vnf_info_list = [] - return vnf_info_list |