aboutsummaryrefslogtreecommitdiffstats
path: root/functest/opnfv_tests/vnf/router
diff options
context:
space:
mode:
Diffstat (limited to 'functest/opnfv_tests/vnf/router')
-rw-r--r--functest/opnfv_tests/vnf/router/__init__.py0
-rw-r--r--functest/opnfv_tests/vnf/router/cloudify_vrouter.py499
-rw-r--r--functest/opnfv_tests/vnf/router/cloudify_vrouter.yaml33
-rw-r--r--functest/opnfv_tests/vnf/router/test_controller/__init__.py0
-rw-r--r--functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py135
-rw-r--r--functest/opnfv_tests/vnf/router/test_scenario.yaml28
-rw-r--r--functest/opnfv_tests/vnf/router/utilvnf.py323
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/__init__.py0
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/checker.py63
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/command_generator.py38
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py135
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py146
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py141
-rw-r--r--functest/opnfv_tests/vnf/router/vrouter_base.py122
14 files changed, 0 insertions, 1663 deletions
diff --git a/functest/opnfv_tests/vnf/router/__init__.py b/functest/opnfv_tests/vnf/router/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/opnfv_tests/vnf/router/__init__.py
+++ /dev/null
diff --git a/functest/opnfv_tests/vnf/router/cloudify_vrouter.py b/functest/opnfv_tests/vnf/router/cloudify_vrouter.py
deleted file mode 100644
index 18acce6f..00000000
--- a/functest/opnfv_tests/vnf/router/cloudify_vrouter.py
+++ /dev/null
@@ -1,499 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2017 Okinawa Open Laboratory and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-"""vrouter testcase implementation."""
-
-import logging
-import os
-import time
-import uuid
-
-from cloudify_rest_client import CloudifyClient
-from cloudify_rest_client.executions import Execution
-from scp import SCPClient
-
-from functest.opnfv_tests.openstack.snaps import snaps_utils
-import functest.opnfv_tests.vnf.router.vrouter_base as vrouter_base
-from functest.opnfv_tests.vnf.router.utilvnf import Utilvnf
-from functest.utils import config
-from functest.utils import functest_utils
-
-from git import Repo
-
-from snaps.config.flavor import FlavorConfig
-from snaps.config.image import ImageConfig
-from snaps.config.keypair import KeypairConfig
-from snaps.config.network import NetworkConfig, PortConfig, SubnetConfig
-from snaps.config.router import RouterConfig
-from snaps.config.security_group import (
- Direction, Protocol, SecurityGroupConfig, SecurityGroupRuleConfig)
-from snaps.config.user import UserConfig
-from snaps.config.vm_inst import FloatingIpConfig, VmInstanceConfig
-
-from snaps.openstack.create_flavor import OpenStackFlavor
-from snaps.openstack.create_image import OpenStackImage
-from snaps.openstack.create_instance import OpenStackVmInstance
-from snaps.openstack.create_keypairs import OpenStackKeypair
-from snaps.openstack.create_network import OpenStackNetwork
-from snaps.openstack.create_security_group import OpenStackSecurityGroup
-from snaps.openstack.create_router import OpenStackRouter
-from snaps.openstack.create_user import OpenStackUser
-
-import snaps.openstack.utils.glance_utils as glance_utils
-from snaps.openstack.utils import keystone_utils
-
-
-__author__ = "Shuya Nakama <shuya.nakama@okinawaopenlabs.org>"
-
-
-class CloudifyVrouter(vrouter_base.VrouterOnBoardingBase):
- # pylint: disable=too-many-instance-attributes
- """vrouter testcase deployed with Cloudify Orchestrator."""
-
- __logger = logging.getLogger(__name__)
- name = __name__
-
- def __init__(self, **kwargs):
- if "case_name" not in kwargs:
- kwargs["case_name"] = "vyos_vrouter"
- super(CloudifyVrouter, self).__init__(**kwargs)
-
- # Retrieve the configuration
- try:
- self.config = getattr(
- config.CONF, 'vnf_{}_config'.format(self.case_name))
- except Exception:
- raise Exception("VNF config file not found")
-
- self.cfy_manager_ip = ''
- self.util_info = {}
- self.deployment_name = ''
-
- config_file = os.path.join(self.case_dir, self.config)
- self.orchestrator = dict(
- requirements=functest_utils.get_parameter_from_yaml(
- "orchestrator.requirements", config_file),
- )
- self.details['orchestrator'] = dict(
- name=functest_utils.get_parameter_from_yaml(
- "orchestrator.name", config_file),
- version=functest_utils.get_parameter_from_yaml(
- "orchestrator.version", config_file),
- status='ERROR',
- result=''
- )
- self.__logger.debug("Orchestrator configuration %s", self.orchestrator)
- self.__logger.debug("name = %s", self.name)
- self.vnf = dict(
- descriptor=functest_utils.get_parameter_from_yaml(
- "vnf.descriptor", config_file),
- inputs=functest_utils.get_parameter_from_yaml(
- "vnf.inputs", config_file),
- requirements=functest_utils.get_parameter_from_yaml(
- "vnf.requirements", config_file)
- )
- self.details['vnf'] = dict(
- descriptor_version=self.vnf['descriptor']['version'],
- name=functest_utils.get_parameter_from_yaml(
- "vnf.name", config_file),
- version=functest_utils.get_parameter_from_yaml(
- "vnf.version", config_file),
- )
- self.__logger.debug("VNF configuration: %s", self.vnf)
-
- self.util = Utilvnf()
-
- self.details['test_vnf'] = dict(
- name=functest_utils.get_parameter_from_yaml(
- "vnf_test_suite.name", config_file),
- version=functest_utils.get_parameter_from_yaml(
- "vnf_test_suite.version", config_file)
- )
- self.images = functest_utils.get_parameter_from_yaml(
- "tenant_images", config_file)
- self.__logger.info("Images needed for vrouter: %s", self.images)
-
- @staticmethod
- def run_blocking_ssh_command(ssh, cmd,
- error_msg="Unable to run this command"):
- """Command to run ssh command with the exit status."""
- (_, stdout, stderr) = ssh.exec_command(cmd)
- CloudifyVrouter.__logger.debug("SSH %s stdout: %s", cmd, stdout.read())
- if stdout.channel.recv_exit_status() != 0:
- CloudifyVrouter.__logger.error(
- "SSH %s stderr: %s", cmd, stderr.read())
- raise Exception(error_msg)
-
- def prepare(self):
- super(CloudifyVrouter, self).prepare()
- self.__logger.info("Additional pre-configuration steps")
- self.util.set_credentials(self.snaps_creds)
- self.__logger.info("Upload some OS images if it doesn't exist")
- for image_name, image_file in self.images.iteritems():
- self.__logger.info("image: %s, file: %s", image_name, image_file)
- if image_file and image_name:
- image_creator = OpenStackImage(
- self.snaps_creds,
- ImageConfig(
- name=image_name, image_user='cloud',
- img_format='qcow2', image_file=image_file))
- image_creator.create()
- self.created_object.append(image_creator)
-
- def deploy_orchestrator(self):
- # pylint: disable=too-many-locals,too-many-statements
- """
- Deploy Cloudify Manager.
- network, security group, fip, VM creation
- """
- # network creation
- start_time = time.time()
- self.__logger.info("Creating keypair ...")
- kp_file = os.path.join(self.data_dir, "cloudify_vrouter.pem")
- keypair_settings = KeypairConfig(
- name='cloudify_vrouter_kp-{}'.format(self.uuid),
- private_filepath=kp_file)
- keypair_creator = OpenStackKeypair(self.snaps_creds, keypair_settings)
- keypair_creator.create()
- self.created_object.append(keypair_creator)
-
- self.__logger.info("Creating full network ...")
- subnet_settings = SubnetConfig(
- name='cloudify_vrouter_subnet-{}'.format(self.uuid),
- cidr='10.67.79.0/24')
- network_settings = NetworkConfig(
- name='cloudify_vrouter_network-{}'.format(self.uuid),
- subnet_settings=[subnet_settings])
- network_creator = OpenStackNetwork(self.snaps_creds, network_settings)
- network_creator.create()
- self.created_object.append(network_creator)
- ext_net_name = snaps_utils.get_ext_net_name(self.snaps_creds)
- router_creator = OpenStackRouter(
- self.snaps_creds,
- RouterConfig(
- name='cloudify_vrouter_router-{}'.format(self.uuid),
- external_gateway=ext_net_name,
- internal_subnets=[subnet_settings.name]))
- router_creator.create()
- self.created_object.append(router_creator)
-
- # security group creation
- self.__logger.info("Creating security group for cloudify manager vm")
- sg_rules = list()
- sg_rules.append(
- SecurityGroupRuleConfig(
- sec_grp_name="sg-cloudify-manager-{}".format(self.uuid),
- direction=Direction.ingress,
- protocol=Protocol.tcp, port_range_min=1,
- port_range_max=65535))
- sg_rules.append(
- SecurityGroupRuleConfig(
- sec_grp_name="sg-cloudify-manager-{}".format(self.uuid),
- direction=Direction.ingress,
- protocol=Protocol.udp, port_range_min=1,
- port_range_max=65535))
-
- security_group_creator = OpenStackSecurityGroup(
- self.snaps_creds,
- SecurityGroupConfig(
- name="sg-cloudify-manager-{}".format(self.uuid),
- rule_settings=sg_rules))
-
- security_group_creator.create()
- self.created_object.append(security_group_creator)
-
- # orchestrator VM flavor
- self.__logger.info("Get or create flavor for cloudify manager vm ...")
-
- flavor_settings = FlavorConfig(
- name=self.orchestrator['requirements']['flavor']['name'],
- ram=self.orchestrator['requirements']['flavor']['ram_min'],
- disk=50, vcpus=2)
- flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings)
- flavor_creator.create()
- self.created_object.append(flavor_creator)
- image_settings = ImageConfig(
- name=self.orchestrator['requirements']['os_image'],
- image_user='centos', exists=True)
-
- port_settings = PortConfig(
- name='cloudify_manager_port-{}'.format(self.uuid),
- network_name=network_settings.name)
-
- manager_settings = VmInstanceConfig(
- name='cloudify_manager-{}'.format(self.uuid),
- flavor=flavor_settings.name,
- port_settings=[port_settings],
- security_group_names=[
- security_group_creator.sec_grp_settings.name],
- floating_ip_settings=[FloatingIpConfig(
- name='cloudify_manager_fip-{}'.format(self.uuid),
- port_name=port_settings.name,
- router_name=router_creator.router_settings.name)])
-
- manager_creator = OpenStackVmInstance(
- self.snaps_creds, manager_settings, image_settings,
- keypair_settings)
-
- self.__logger.info("Creating cloudify manager VM")
- manager_creator.create()
- self.created_object.append(manager_creator)
-
- cfy_client = CloudifyClient(
- host=manager_creator.get_floating_ip().ip,
- username='admin', password='admin', tenant='default_tenant')
-
- self.orchestrator['object'] = cfy_client
-
- self.cfy_manager_ip = manager_creator.get_floating_ip().ip
-
- self.__logger.info("Attemps running status of the Manager")
- cfy_status = None
- retry = 10
- while str(cfy_status) != 'running' and retry:
- try:
- cfy_status = cfy_client.manager.get_status()['status']
- self.__logger.debug("The current manager status is %s",
- cfy_status)
- except Exception: # pylint: disable=broad-except
- self.__logger.exception(
- "Cloudify Manager isn't up and running. Retrying ...")
- retry = retry - 1
- time.sleep(30)
-
- if str(cfy_status) == 'running':
- self.__logger.info("Cloudify Manager is up and running")
- else:
- raise Exception("Cloudify Manager isn't up and running")
-
- duration = time.time() - start_time
-
- self.__logger.info("Put private keypair in manager")
- if manager_creator.vm_ssh_active(block=True):
- ssh = manager_creator.ssh_client()
- scp = SCPClient(ssh.get_transport(), socket_timeout=15.0)
- scp.put(kp_file, '~/')
- cmd = "sudo cp ~/cloudify_vrouter.pem /etc/cloudify/"
- self.run_blocking_ssh_command(ssh, cmd)
- cmd = "sudo chmod 444 /etc/cloudify/cloudify_vrouter.pem"
- self.run_blocking_ssh_command(ssh, cmd)
- cmd = "sudo yum install -y gcc python-devel"
- self.run_blocking_ssh_command(
- ssh, cmd, "Unable to install packages on manager")
-
- self.details['orchestrator'].update(status='PASS', duration=duration)
-
- self.vnf['inputs'].update(dict(external_network_name=ext_net_name))
-
- return True
-
- def deploy_vnf(self):
- start_time = time.time()
-
- self.__logger.info("Upload VNFD")
- cfy_client = self.orchestrator['object']
- descriptor = self.vnf['descriptor']
- self.deployment_name = descriptor.get('name')
-
- vrouter_blueprint_dir = os.path.join(
- self.data_dir, self.util.blueprint_dir)
- if not os.path.exists(vrouter_blueprint_dir):
- Repo.clone_from(
- descriptor.get('url'), vrouter_blueprint_dir,
- branch=descriptor.get('version'))
-
- cfy_client.blueprints.upload(
- vrouter_blueprint_dir + self.util.blueprint_file_name,
- descriptor.get('name'))
-
- self.__logger.info("Get or create flavor for vrouter")
- flavor_settings = FlavorConfig(
- name=self.vnf['requirements']['flavor']['name'],
- ram=self.vnf['requirements']['flavor']['ram_min'],
- disk=25, vcpus=1)
- flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings)
- flavor = flavor_creator.create()
- self.created_object.append(flavor_creator)
-
- # set image name
- glance = glance_utils.glance_client(self.snaps_creds)
- image = glance_utils.get_image(glance, "vyos1.1.7")
-
- user_creator = OpenStackUser(
- self.snaps_creds,
- UserConfig(
- name='cloudify_network_bug-{}'.format(self.uuid),
- password=str(uuid.uuid4()),
- roles={'_member_': self.tenant_name}))
- user_creator.create()
- self.created_object.append(user_creator)
- snaps_creds = user_creator.get_os_creds(self.snaps_creds.project_name)
-
- self.vnf['inputs'].update(dict(target_vnf_image_id=image.id))
- self.vnf['inputs'].update(dict(reference_vnf_image_id=image.id))
- self.vnf['inputs'].update(dict(target_vnf_flavor_id=flavor.id))
- self.vnf['inputs'].update(dict(reference_vnf_flavor_id=flavor.id))
- self.vnf['inputs'].update(dict(
- keystone_username=snaps_creds.username))
- self.vnf['inputs'].update(dict(
- keystone_password=snaps_creds.password))
- self.vnf['inputs'].update(dict(
- keystone_tenant_name=snaps_creds.project_name))
- self.vnf['inputs'].update(dict(
- region=snaps_creds.region_name))
- self.vnf['inputs'].update(dict(
- keystone_url=keystone_utils.get_endpoint(
- snaps_creds, 'identity')))
-
- self.__logger.info("Create VNF Instance")
- cfy_client.deployments.create(
- descriptor.get('name'), descriptor.get('name'),
- self.vnf.get('inputs'))
-
- wait_for_execution(
- cfy_client, get_execution_id(cfy_client, descriptor.get('name')),
- self.__logger, timeout=7200)
-
- self.__logger.info("Start the VNF Instance deployment")
- execution = cfy_client.executions.start(descriptor.get('name'),
- 'install')
- # Show execution log
- execution = wait_for_execution(cfy_client, execution, self.__logger)
-
- duration = time.time() - start_time
-
- self.__logger.info(execution)
- if execution.status == 'terminated':
- self.details['vnf'].update(status='PASS', duration=duration)
- result = True
- else:
- self.details['vnf'].update(status='FAIL', duration=duration)
- result = False
- return result
-
- def test_vnf(self):
- cfy_client = self.orchestrator['object']
- credentials = {"snaps_creds": self.snaps_creds,
- "username": self.snaps_creds.username,
- "password": self.snaps_creds.password,
- "auth_url": self.snaps_creds.auth_url,
- "tenant_name": self.snaps_creds.project_name}
-
- self.util_info = {"credentials": credentials,
- "cfy": cfy_client,
- "vnf_data_dir": self.util.vnf_data_dir}
-
- start_time = time.time()
-
- result, test_result_data = super(CloudifyVrouter, self).test_vnf()
-
- duration = time.time() - start_time
-
- if result:
- self.details['test_vnf'].update(
- status='PASS', result='OK', full_result=test_result_data,
- duration=duration)
- else:
- self.details['test_vnf'].update(
- status='FAIL', result='NG', full_result=test_result_data,
- duration=duration)
-
- return True
-
- def clean(self):
- try:
- cfy_client = self.orchestrator['object']
- dep_name = self.vnf['descriptor'].get('name')
- # kill existing execution
- self.__logger.info('Deleting the current deployment')
- exec_list = cfy_client.executions.list(dep_name)
- for execution in exec_list:
- if execution['status'] == "started":
- try:
- cfy_client.executions.cancel(
- execution['id'], force=True)
- except Exception: # pylint: disable=broad-except
- self.__logger.warn("Can't cancel the current exec")
-
- execution = cfy_client.executions.start(
- dep_name, 'uninstall', parameters=dict(ignore_failure=True))
-
- wait_for_execution(cfy_client, execution, self.__logger)
- cfy_client.deployments.delete(self.vnf['descriptor'].get('name'))
- cfy_client.blueprints.delete(self.vnf['descriptor'].get('name'))
- except Exception: # pylint: disable=broad-except
- self.__logger.warn("Some issue during the undeployment ..")
- self.__logger.warn("Tenant clean continue ..")
- super(CloudifyVrouter, self).clean()
-
- def get_vnf_info_list(self, target_vnf_name):
- return self.util.get_vnf_info_list(
- self.cfy_manager_ip, self.deployment_name, target_vnf_name)
-
-
-def wait_for_execution(client, execution, logger, timeout=7200, ):
- """Wait for a workflow execution on Cloudify Manager."""
- # if execution already ended - return without waiting
- if execution.status in Execution.END_STATES:
- return execution
-
- if timeout is not None:
- deadline = time.time() + timeout
-
- # Poll for execution status and execution logs, until execution ends
- # and we receive an event of type in WORKFLOW_END_TYPES
- offset = 0
- batch_size = 50
- event_list = []
- execution_ended = False
- while True:
- event_list = client.events.list(
- execution_id=execution.id, _offset=offset, _size=batch_size,
- include_logs=False, sort='@timestamp').items
-
- offset = offset + len(event_list)
- for event in event_list:
- logger.debug(event.get('message'))
-
- if timeout is not None:
- if time.time() > deadline:
- raise RuntimeError(
- 'execution of operation {0} for deployment {1} '
- 'timed out'.format(execution.workflow_id,
- execution.deployment_id))
- else:
- # update the remaining timeout
- timeout = deadline - time.time()
-
- if not execution_ended:
- execution = client.executions.get(execution.id)
- execution_ended = execution.status in Execution.END_STATES
-
- if execution_ended:
- break
-
- time.sleep(5)
-
- return execution
-
-
-def get_execution_id(client, deployment_id):
- """
- Get the execution id of a env preparation.
- network, security group, fip, VM creation
- """
- executions = client.executions.list(deployment_id=deployment_id)
- for execution in executions:
- if execution.workflow_id == 'create_deployment_environment':
- return execution
- raise RuntimeError('Failed to get create_deployment_environment '
- 'workflow execution.'
- 'Available executions: {0}'.format(executions))
diff --git a/functest/opnfv_tests/vnf/router/cloudify_vrouter.yaml b/functest/opnfv_tests/vnf/router/cloudify_vrouter.yaml
deleted file mode 100644
index 58bdb66a..00000000
--- a/functest/opnfv_tests/vnf/router/cloudify_vrouter.yaml
+++ /dev/null
@@ -1,33 +0,0 @@
----
-tenant_images:
- cloudify_manager_4.0:
- /home/opnfv/functest/images/cloudify-manager-premium-4.0.1.qcow2
- vyos1.1.7: /home/opnfv/functest/images/vyos-1.1.7.img
-test_data:
- url: 'https://github.com/oolorg/opnfv-vnf-data.git'
- branch: 'master'
-orchestrator:
- name: cloudify
- version: '4.0'
- requirements:
- flavor:
- name: m1.medium
- ram_min: 4096
- os_image: 'cloudify_manager_4.0'
-vnf:
- name: vyos1.1.7
- version: '1.1.7'
- descriptor:
- url: https://github.com/oolorg/opnfv-vnf-vyos-blueprint/
- name: vrouter-opnfv
- version: 'master'
- requirements:
- flavor:
- name: m1.medium
- ram_min: 2048
- inputs:
- external_network_name: admin_floating_net
- region: RegionOne
-vnf_test_suite:
- name: vyos-vrouter-test
- version: "1.0"
diff --git a/functest/opnfv_tests/vnf/router/test_controller/__init__.py b/functest/opnfv_tests/vnf/router/test_controller/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/opnfv_tests/vnf/router/test_controller/__init__.py
+++ /dev/null
diff --git a/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py b/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py
deleted file mode 100644
index be7bee88..00000000
--- a/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py
+++ /dev/null
@@ -1,135 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2017 Okinawa Open Laboratory and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-"""vrouter function test execution module"""
-
-import logging
-import time
-import yaml
-
-from functest.opnfv_tests.vnf.router.utilvnf import Utilvnf
-from functest.opnfv_tests.vnf.router.vnf_controller.vnf_controller import (
- VnfController)
-
-
-class FunctionTestExec(object):
- """vrouter function test execution class"""
-
- logger = logging.getLogger(__name__)
-
- def __init__(self, util_info):
- self.logger.debug("init test exec")
- self.util = Utilvnf()
- credentials = util_info["credentials"]
- self.vnf_ctrl = VnfController(util_info)
-
- test_cmd_map_file = open(self.util.vnf_data_dir +
- self.util.opnfv_vnf_data_dir +
- self.util.command_template_dir +
- self.util.test_cmd_map_yaml_file,
- 'r')
- self.test_cmd_map_yaml = yaml.safe_load(test_cmd_map_file)
- test_cmd_map_file.close()
-
- self.util.set_credentials(credentials["snaps_creds"])
-
- with open(self.util.test_env_config_yaml) as file_fd:
- test_env_config_yaml = yaml.safe_load(file_fd)
- file_fd.close()
-
- self.protocol_stable_wait = test_env_config_yaml.get("general").get(
- "protocol_stable_wait")
-
- def config_target_vnf(self, target_vnf, reference_vnf, test_kind):
- self.logger.debug("Configuration to target vnf")
- test_info = self.test_cmd_map_yaml[target_vnf["os_type"]]
- test_cmd_file_path = test_info[test_kind]["pre_command_target"]
- target_parameter_file_path = test_info[test_kind]["parameter_target"]
- prompt_file_path = test_info["prompt"]
-
- return self.vnf_ctrl.config_vnf(target_vnf,
- reference_vnf,
- test_cmd_file_path,
- target_parameter_file_path,
- prompt_file_path)
-
- def config_reference_vnf(self, target_vnf, reference_vnf, test_kind):
- self.logger.debug("Configuration to reference vnf")
- test_info = self.test_cmd_map_yaml[reference_vnf["os_type"]]
- test_cmd_file_path = test_info[test_kind]["pre_command_reference"]
- reference_parameter_file_path = test_info[test_kind][
- "parameter_reference"]
- prompt_file_path = test_info["prompt"]
-
- return self.vnf_ctrl.config_vnf(reference_vnf,
- target_vnf,
- test_cmd_file_path,
- reference_parameter_file_path,
- prompt_file_path)
-
- def result_check(self, target_vnf, reference_vnf, test_kind, test_list):
- test_info = self.test_cmd_map_yaml[target_vnf["os_type"]]
- target_parameter_file_path = test_info[test_kind]["parameter_target"]
- prompt_file_path = test_info["prompt"]
- check_rule_file_path_list = []
-
- for test in test_list:
- check_rule_file_path_list.append(test_info[test_kind][test])
-
- return self.vnf_ctrl.result_check(target_vnf,
- reference_vnf,
- check_rule_file_path_list,
- target_parameter_file_path,
- prompt_file_path)
-
- def run(self, target_vnf, reference_vnf_list, test_info, test_list):
- test_result_data = {}
- test_kind = test_info["protocol"]
- for reference_vnf in reference_vnf_list:
- self.logger.debug("Start config command " +
- target_vnf["vnf_name"] + " and " +
- reference_vnf["vnf_name"])
-
- result = self.config_target_vnf(target_vnf,
- reference_vnf,
- test_kind)
- if not result:
- return False, test_result_data
-
- result = self.config_reference_vnf(target_vnf,
- reference_vnf,
- test_kind)
- if not result:
- return False, test_result_data
-
- self.logger.debug("Finish config command.")
-
- self.logger.debug("Waiting for protocol stable.")
- time.sleep(self.protocol_stable_wait)
-
- self.logger.debug("Start check method")
-
- (result, res_dict_data_list) = self.result_check(target_vnf,
- reference_vnf,
- test_kind,
- test_list)
-
- test_result_data = {"test_kind": test_info["test_kind"],
- "protocol": test_info["protocol"],
- "result": res_dict_data_list}
-
- if not result:
- self.logger.debug("Error check method.")
- return False, test_result_data
-
- self.logger.debug("Finish check method.")
-
- return True, test_result_data
diff --git a/functest/opnfv_tests/vnf/router/test_scenario.yaml b/functest/opnfv_tests/vnf/router/test_scenario.yaml
deleted file mode 100644
index 8bdaf06a..00000000
--- a/functest/opnfv_tests/vnf/router/test_scenario.yaml
+++ /dev/null
@@ -1,28 +0,0 @@
----
-test_scenario_list:
- -
- test_type: 'function_test'
- vnf_list:
- -
- vnf_name: 'target_vnf'
- os_type: 'vyos'
- image_name: 'vyos1.1.7'
- flavor_name: 'm1.medium'
- -
- vnf_name: 'reference_vnf'
- os_type: 'vyos'
- image_name: 'vyos1.1.7'
- flavor_name: 'm1.medium'
- function_test_list:
- -
- target_vnf_name: 'target_vnf'
- test_list:
- -
- test_kind: 'Interoperability'
- protocol: 'BGP'
- BGP:
- - 'Checking_the_peer_of_BGP'
- - 'Checking_the_status_of_BGP'
- - 'Checking_the_advertised_routes'
- - 'Checking_the_received_routes'
- - 'Checking_the_routing_table'
diff --git a/functest/opnfv_tests/vnf/router/utilvnf.py b/functest/opnfv_tests/vnf/router/utilvnf.py
deleted file mode 100644
index 6861b386..00000000
--- a/functest/opnfv_tests/vnf/router/utilvnf.py
+++ /dev/null
@@ -1,323 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2017 Okinawa Open Laboratory and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-""" Utility module of vrouter testcase """
-
-import json
-import logging
-import os
-import pkg_resources
-import requests
-import yaml
-
-from functest.utils import config
-from git import Repo
-from requests.auth import HTTPBasicAuth
-from snaps.openstack.utils import nova_utils
-
-
-RESULT_SPRIT_INDEX = {
- "transfer": 8,
- "bandwidth": 6,
- "jitter": 4,
- "los_total": 2,
- "pkt_loss": 1
-}
-
-BIT_PER_BYTE = 8
-
-NOVA_CLIENT_API_VERSION = '2'
-NOVA_CILENT_NETWORK_INFO_INDEX = 0
-CFY_INFO_OUTPUT_FILE = "output.txt"
-
-CIDR_NETWORK_SEGMENT_INFO_INDEX = 0
-PACKET_LOST_INFO_INDEX = 0
-PACKET_TOTAL_INFO_INDEX = 1
-
-NUMBER_OF_DIGITS_FOR_AVG_TRANSFER = 0
-NUMBER_OF_DIGITS_FOR_AVG_BANDWIDTH = 0
-NUMBER_OF_DIGITS_FOR_AVG_JITTER = 3
-NUMBER_OF_DIGITS_FOR_AVG_PKT_LOSS = 1
-
-
-class Utilvnf(object): # pylint: disable=too-many-instance-attributes
- """ Utility class of vrouter testcase """
-
- logger = logging.getLogger(__name__)
-
- def __init__(self):
- self.snaps_creds = ""
- self.vnf_data_dir = getattr(config.CONF, 'dir_router_data')
- self.opnfv_vnf_data_dir = "opnfv-vnf-data/"
- self.command_template_dir = "command_template/"
- self.test_scenario_yaml = "test_scenario.yaml"
- test_env_config_yaml_file = "test_env_config.yaml"
- self.test_cmd_map_yaml_file = "test_cmd_map.yaml"
- self.test_env_config_yaml = os.path.join(
- self.vnf_data_dir,
- self.opnfv_vnf_data_dir,
- test_env_config_yaml_file)
-
- self.blueprint_dir = "opnfv-vnf-vyos-blueprint/"
- self.blueprint_file_name = "function-test-openstack-blueprint.yaml"
-
- if not os.path.exists(self.vnf_data_dir):
- os.makedirs(self.vnf_data_dir)
-
- case_dir = pkg_resources.resource_filename(
- 'functest', 'opnfv_tests/vnf/router')
-
- config_file_name = getattr(
- config.CONF, 'vnf_{}_config'.format("vyos_vrouter"))
-
- config_file = os.path.join(case_dir, config_file_name)
-
- with open(config_file) as file_fd:
- vrouter_config_yaml = yaml.safe_load(file_fd)
- file_fd.close()
-
- test_data = vrouter_config_yaml.get("test_data")
-
- self.logger.debug("Downloading the test data.")
- vrouter_data_path = self.vnf_data_dir + self.opnfv_vnf_data_dir
-
- if not os.path.exists(vrouter_data_path):
- Repo.clone_from(test_data['url'],
- vrouter_data_path,
- branch=test_data['branch'])
-
- with open(self.test_env_config_yaml) as file_fd:
- test_env_config_yaml = yaml.safe_load(file_fd)
- file_fd.close()
-
- self.image = test_env_config_yaml.get(
- "general").get("images").get("vyos")
- self.tester_image = test_env_config_yaml.get(
- "general").get("images").get("tester_vm_os")
-
- self.test_result_json_file = "test_result.json"
- if os.path.isfile(self.test_result_json_file):
- os.remove(self.test_result_json_file)
- self.logger.debug("removed %s", self.test_result_json_file)
-
- def get_nova_client(self):
- nova_client = nova_utils.nova_client(self.snaps_creds)
-
- return nova_client
-
- def set_credentials(self, snaps_creds):
- self.snaps_creds = snaps_creds
-
- def get_address(self, server_name, network_name):
- nova_client = self.get_nova_client()
- servers_list = nova_client.servers.list()
- server = None
-
- for server in servers_list:
- if server.name == server_name:
- break
-
- address = server.addresses[
- network_name][NOVA_CILENT_NETWORK_INFO_INDEX]["addr"]
-
- return address
-
- def get_mac_address(self, server_name, network_name):
- nova_client = self.get_nova_client()
- servers_list = nova_client.servers.list()
- server = None
-
- for server in servers_list:
- if server.name == server_name:
- break
-
- mac_address = server.addresses[network_name][
- NOVA_CILENT_NETWORK_INFO_INDEX]["OS-EXT-IPS-MAC:mac_addr"]
-
- return mac_address
-
- def reboot_vm(self, server_name):
- nova_client = self.get_nova_client()
- servers_list = nova_client.servers.list()
- server = None
-
- for server in servers_list:
- if server.name == server_name:
- break
-
- server.reboot()
-
- return
-
- def delete_vm(self, server_name):
- nova_client = self.get_nova_client()
- servers_list = nova_client.servers.list()
- server = None
-
- for server in servers_list:
- if server.name == server_name:
- nova_client.servers.delete(server)
- break
-
- return
-
- def get_blueprint_outputs(self, cfy_manager_ip, deployment_name):
- url = "http://%s/deployments/%s/outputs" % (
- cfy_manager_ip, deployment_name)
-
- response = requests.get(
- url,
- auth=HTTPBasicAuth('admin', 'admin'),
- headers={'Tenant': 'default_tenant'})
-
- resp_data = response.json()
- self.logger.debug(resp_data)
- data = resp_data["outputs"]
- return data
-
- def get_blueprint_outputs_vnfs(self, cfy_manager_ip, deployment_name):
- outputs = self.get_blueprint_outputs(cfy_manager_ip,
- deployment_name)
- vnfs = outputs["vnfs"]
- vnf_list = []
- for vnf_name in vnfs:
- vnf_list.append(vnfs[vnf_name])
- return vnf_list
-
- def get_blueprint_outputs_networks(self, cfy_manager_ip, deployment_name):
- outputs = self.get_blueprint_outputs(cfy_manager_ip,
- deployment_name)
- networks = outputs["networks"]
- network_list = []
- for network_name in networks:
- network_list.append(networks[network_name])
- return network_list
-
- def request_vnf_reboot(self, vnf_info_list):
- for vnf in vnf_info_list:
- self.logger.debug("reboot the " + vnf["vnf_name"])
- self.reboot_vm(vnf["vnf_name"])
-
- def request_vm_delete(self, vnf_info_list):
- for vnf in vnf_info_list:
- self.logger.debug("delete the " + vnf["vnf_name"])
- self.delete_vm(vnf["vnf_name"])
-
- def get_vnf_info_list(self, cfy_manager_ip, topology_deploy_name,
- target_vnf_name):
- network_list = self.get_blueprint_outputs_networks(
- cfy_manager_ip,
- topology_deploy_name)
- vnf_info_list = self.get_blueprint_outputs_vnfs(cfy_manager_ip,
- topology_deploy_name)
- for vnf in vnf_info_list:
- vnf_name = vnf["vnf_name"]
- vnf["os_type"] = self.image["os_type"]
- vnf["user"] = self.image["user"]
- vnf["pass"] = self.image["pass"]
-
- vnf["target_vnf_flag"] = bool(vnf_name == target_vnf_name)
-
- self.logger.debug("vnf name : " + vnf_name)
- self.logger.debug(vnf_name + " floating ip address : " +
- vnf["floating_ip"])
-
- for network in network_list:
- network_name = network["network_name"]
- ip_address = self.get_address(vnf["vnf_name"],
- network["network_name"])
- vnf[network_name + "_ip"] = ip_address
- mac = self.get_mac_address(vnf["vnf_name"],
- network["network_name"])
- vnf[network_name + "_mac"] = mac
-
- self.logger.debug(network_name + "_ip of " + vnf["vnf_name"] +
- " : " + vnf[network_name + "_ip"])
- self.logger.debug(network_name + "_mac of " + vnf["vnf_name"] +
- " : " + vnf[network_name + "_mac"])
-
- return vnf_info_list
-
- @staticmethod
- def get_target_vnf(vnf_info_list):
- for vnf in vnf_info_list:
- if vnf["target_vnf_flag"]:
- return vnf
-
- return None
-
- @staticmethod
- def get_reference_vnf_list(vnf_info_list):
- reference_vnf_list = []
- for vnf in vnf_info_list:
- if not vnf["target_vnf_flag"]:
- reference_vnf_list.append(vnf)
-
- return reference_vnf_list
-
- @staticmethod
- def get_vnf_info(vnf_info_list, vnf_name):
- for vnf in vnf_info_list:
- if vnf["vnf_name"] == vnf_name:
- return vnf
-
- return None
-
- @staticmethod
- def convert_functional_test_result(result_data_list):
- result = {}
- for result_data in result_data_list:
- test_kind = result_data["test_kind"]
- protocol = result_data["protocol"]
- test_result_data = result_data["result"]
-
- if test_kind not in result:
- result[test_kind] = []
-
- result[test_kind].append({protocol: test_result_data})
-
- return {"Functional_test": result}
-
- def write_result_data(self, result_data):
- test_result = []
- if not os.path.isfile(self.test_result_json_file):
- file_fd = open(self.test_result_json_file, "w")
- file_fd.close()
- else:
- file_fd = open(self.test_result_json_file, "r")
- test_result = json.load(file_fd)
- file_fd.close()
-
- test_result.append(result_data)
-
- file_fd = open(self.test_result_json_file, "w")
- json.dump(test_result, file_fd)
- file_fd.close()
-
- def output_test_result_json(self):
- if os.path.isfile(self.test_result_json_file):
- file_fd = open(self.test_result_json_file, "r")
- test_result = json.load(file_fd)
- file_fd.close()
- output_json_data = json.dumps(test_result,
- sort_keys=True,
- indent=4)
- self.logger.debug("test_result %s", output_json_data)
- else:
- self.logger.debug("Not found %s", self.test_result_json_file)
-
- @staticmethod
- def get_test_scenario(file_path):
- test_scenario_file = open(file_path,
- 'r')
- test_scenario_yaml = yaml.safe_load(test_scenario_file)
- test_scenario_file.close()
- return test_scenario_yaml["test_scenario_list"]
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/__init__.py b/functest/opnfv_tests/vnf/router/vnf_controller/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/opnfv_tests/vnf/router/vnf_controller/__init__.py
+++ /dev/null
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/checker.py b/functest/opnfv_tests/vnf/router/vnf_controller/checker.py
deleted file mode 100644
index a7a70f6d..00000000
--- a/functest/opnfv_tests/vnf/router/vnf_controller/checker.py
+++ /dev/null
@@ -1,63 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2017 Okinawa Open Laboratory and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-"""vrouter test result check module"""
-
-import json
-import logging
-import re
-
-from jinja2 import Environment, FileSystemLoader
-
-
-class Checker(object):
- """vrouter test result check class"""
-
- logger = logging.getLogger(__name__)
-
- def __init__(self):
- self.logger.debug("init checker")
-
- @staticmethod
- def load_check_rule(rule_file_dir, rule_file_name, parameter):
- loader = FileSystemLoader(rule_file_dir,
- encoding='utf8')
- env = Environment(loader=loader)
- check_rule_template = env.get_template(rule_file_name)
- check_rule = check_rule_template.render(parameter)
- check_rule_data = json.loads(check_rule)
- return check_rule_data
-
- @staticmethod
- def regexp_information(response, rules):
- status = False
- result_data = {}
-
- for rule in rules["rules"]:
- result_data = {
- "test_name": rule["description"],
- "result": "NG"
- }
-
- match = re.search(rule["regexp"],
- response)
- rule["response"] = response
- if match is None:
- status = False
- break
-
- if not match.group(1) == rule["result"]:
- status = False
- else:
- result_data["result"] = "OK"
- status = True
-
- return status, result_data
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/command_generator.py b/functest/opnfv_tests/vnf/router/vnf_controller/command_generator.py
deleted file mode 100644
index 7d9116bc..00000000
--- a/functest/opnfv_tests/vnf/router/vnf_controller/command_generator.py
+++ /dev/null
@@ -1,38 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2017 Okinawa Open Laboratory and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-"""command generator module for vrouter testing"""
-
-import logging
-from jinja2 import Environment, FileSystemLoader
-
-
-class CommandGenerator(object):
- """command generator class for vrouter testing"""
-
- logger = logging.getLogger(__name__)
-
- def __init__(self):
- self.logger.debug("init command generator")
-
- @staticmethod
- def load_template(template_dir, template):
- # pylint disable=missing-docstring
- loader = FileSystemLoader(template_dir,
- encoding='utf8')
- env = Environment(loader=loader)
- return env.get_template(template)
-
- @staticmethod
- def command_create(template, parameter):
- # pylint disable=missing-docstring
- commands = template.render(parameter)
- return commands.split('\n')
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py b/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py
deleted file mode 100644
index 628afd30..00000000
--- a/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py
+++ /dev/null
@@ -1,135 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2017 Okinawa Open Laboratory and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""ssh client module for vrouter testing"""
-
-import logging
-import time
-import yaml
-
-import paramiko
-
-from functest.opnfv_tests.vnf.router.utilvnf import Utilvnf
-
-RECEIVE_ROOP_WAIT = 1
-
-DEFAULT_CONNECT_TIMEOUT = 10
-DEFAULT_CONNECT_RETRY_COUNT = 10
-DEFAULT_SEND_TIMEOUT = 10
-
-
-class SshClient(object): # pylint: disable=too-many-instance-attributes
- """ssh client class for vrouter testing"""
-
- logger = logging.getLogger(__name__)
-
- def __init__(self, ip_address, user, password=None, key_filename=None):
- self.ip_address = ip_address
- self.user = user
- self.password = password
- self.key_filename = key_filename
- self.connected = False
- self.shell = None
-
- self.logger.setLevel(logging.INFO)
-
- self.ssh = paramiko.SSHClient()
- self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-
- self.util = Utilvnf()
- with open(self.util.test_env_config_yaml) as file_fd:
- test_env_config_yaml = yaml.safe_load(file_fd)
- file_fd.close()
-
- self.ssh_revieve_buff = test_env_config_yaml.get("general").get(
- "ssh_receive_buffer")
-
- def connect(self, time_out=DEFAULT_CONNECT_TIMEOUT,
- retrycount=DEFAULT_CONNECT_RETRY_COUNT):
- # pylint: disable=missing-docstring
- while retrycount > 0:
- try:
- self.logger.info("SSH connect to %s.", self.ip_address)
- self.ssh.connect(self.ip_address,
- username=self.user,
- password=self.password,
- key_filename=self.key_filename,
- timeout=time_out,
- look_for_keys=False,
- allow_agent=False)
-
- self.logger.info("SSH connection established to %s.",
- self.ip_address)
-
- self.shell = self.ssh.invoke_shell()
-
- while not self.shell.recv_ready():
- time.sleep(RECEIVE_ROOP_WAIT)
-
- self.shell.recv(self.ssh_revieve_buff)
- break
- except Exception: # pylint: disable=broad-except
- self.logger.info("SSH timeout for %s...", self.ip_address)
- time.sleep(time_out)
- retrycount -= 1
-
- if retrycount == 0:
- self.logger.error("Cannot establish connection to IP '%s'. " +
- "Aborting",
- self.ip_address)
- self.connected = False
- return self.connected
-
- self.connected = True
- return self.connected
-
- def send(self, cmd, prompt, timeout=DEFAULT_SEND_TIMEOUT):
- # pylint: disable=missing-docstring
- if self.connected is True:
- self.shell.settimeout(timeout)
- self.logger.debug("Commandset : '%s'", cmd)
-
- try:
- self.shell.send(cmd + '\n')
- except Exception: # pylint: disable=broad-except
- self.logger.error("ssh send timeout : Command : '%s'", cmd)
- return None
-
- res_buff = ''
- while not res_buff.endswith(prompt):
- time.sleep(RECEIVE_ROOP_WAIT)
- try:
- res = self.shell.recv(self.ssh_revieve_buff)
- except Exception: # pylint: disable=broad-except
- self.logger.error("ssh receive timeout : Command : '%s'",
- cmd)
- break
-
- res_buff += res
-
- self.logger.debug("Response : '%s'", res_buff)
- return res_buff
- self.logger.error("Cannot connected to IP '%s'.", self.ip_address)
- return None
-
- def close(self):
- # pylint: disable=missing-docstring
- if self.connected is True:
- self.ssh.close()
-
- @staticmethod
- def error_check(response, err_strs=None):
- # pylint: disable=missing-docstring
- if err_strs is None:
- err_strs = ["error", "warn", "unknown command", "already exist"]
- for err in err_strs:
- if err in response:
- return False
-
- return True
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py b/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py
deleted file mode 100644
index 10e48645..00000000
--- a/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py
+++ /dev/null
@@ -1,146 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2017 Okinawa Open Laboratory and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-"""vm controll module"""
-
-import logging
-import os
-import time
-import yaml
-
-from functest.opnfv_tests.vnf.router.utilvnf import Utilvnf
-from functest.opnfv_tests.vnf.router.vnf_controller.command_generator import (
- CommandGenerator)
-from functest.opnfv_tests.vnf.router.vnf_controller.ssh_client import (
- SshClient)
-
-
-class VmController(object):
- """vm controll class"""
-
- logger = logging.getLogger(__name__)
-
- def __init__(self, util_info):
- self.logger.debug("initialize vm controller")
- self.command_gen = CommandGenerator()
- credentials = util_info["credentials"]
-
- self.util = Utilvnf()
- self.util.set_credentials(credentials["snaps_creds"])
-
- with open(self.util.test_env_config_yaml) as file_fd:
- test_env_config_yaml = yaml.safe_load(file_fd)
- file_fd.close()
-
- self.reboot_wait = test_env_config_yaml.get("general").get(
- "reboot_wait")
- self.command_wait = test_env_config_yaml.get("general").get(
- "command_wait")
- self.ssh_connect_timeout = test_env_config_yaml.get("general").get(
- "ssh_connect_timeout")
- self.ssh_connect_retry_count = test_env_config_yaml.get("general").get(
- "ssh_connect_retry_count")
-
- def command_gen_from_template(self, command_file_path, cmd_input_param):
- (command_file_dir, command_file_name) = os.path.split(
- command_file_path)
- template = self.command_gen.load_template(command_file_dir,
- command_file_name)
- return self.command_gen.command_create(template,
- cmd_input_param)
-
- def config_vm(self, vm_info, test_cmd_file_path,
- cmd_input_param, prompt_file_path):
- ssh = self.connect_ssh_and_config_vm(vm_info,
- test_cmd_file_path,
- cmd_input_param,
- prompt_file_path)
- if ssh is None:
- return False
-
- ssh.close()
-
- return True
-
- def connect_ssh_and_config_vm(self, vm_info, test_cmd_file_path,
- cmd_input_param, prompt_file_path):
-
- key_filename = None
- if "key_path" in vm_info:
- key_filename = vm_info["key_path"]
-
- ssh = SshClient(ip_address=vm_info["floating_ip"],
- user=vm_info["user"],
- password=vm_info["pass"],
- key_filename=key_filename)
-
- result = ssh.connect(self.ssh_connect_timeout,
- self.ssh_connect_retry_count)
- if not result:
- self.logger.debug("try to vm reboot.")
- self.util.reboot_vm(vm_info["vnf_name"])
- time.sleep(self.reboot_wait)
- result = ssh.connect(self.ssh_connect_timeout,
- self.ssh_connect_retry_count)
- if not result:
- return None
-
- (result, _) = self.command_create_and_execute(
- ssh,
- test_cmd_file_path,
- cmd_input_param,
- prompt_file_path)
- if not result:
- ssh.close()
- return None
-
- return ssh
-
- def command_create_and_execute(self, ssh, test_cmd_file_path,
- cmd_input_param, prompt_file_path):
- prompt_file = open(prompt_file_path,
- 'r')
- prompt = yaml.safe_load(prompt_file)
- prompt_file.close()
- config_mode_prompt = prompt["config_mode"]
-
- commands = self.command_gen_from_template(test_cmd_file_path,
- cmd_input_param)
- return self.command_list_execute(ssh,
- commands,
- config_mode_prompt)
-
- def command_list_execute(self, ssh, command_list, prompt):
- res_data_list = []
- for command in command_list:
- self.logger.debug("Command : " + command)
- (res, res_data) = self.command_execute(ssh,
- command,
- prompt)
- self.logger.debug("Response : " + res_data)
- res_data_list.append(res_data)
- if not res:
- return res, res_data_list
-
- time.sleep(self.command_wait)
-
- return True, res_data_list
-
- def command_execute(self, ssh, command, prompt):
- res_data = ssh.send(command, prompt)
- if res_data is None:
- self.logger.info("retry send command : " + command)
- res_data = ssh.send(command,
- prompt)
- if not ssh.error_check(res_data):
- return False, res_data
-
- return True, res_data
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py b/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py
deleted file mode 100644
index a5b1ad85..00000000
--- a/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py
+++ /dev/null
@@ -1,141 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2017 Okinawa Open Laboratory and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-"""vrouter controll module"""
-
-import logging
-import os
-import time
-import yaml
-
-import prettytable
-
-from functest.opnfv_tests.vnf.router.utilvnf import Utilvnf
-from functest.opnfv_tests.vnf.router.vnf_controller.checker import Checker
-from functest.opnfv_tests.vnf.router.vnf_controller.ssh_client import (
- SshClient)
-from functest.opnfv_tests.vnf.router.vnf_controller.vm_controller import (
- VmController)
-
-
-class VnfController(object):
- """vrouter controll class"""
-
- logger = logging.getLogger(__name__)
-
- def __init__(self, util_info):
- self.logger.debug("init vnf controller")
- self.util = Utilvnf()
- self.vm_controller = VmController(util_info)
-
- with open(self.util.test_env_config_yaml) as file_fd:
- test_env_config_yaml = yaml.safe_load(file_fd)
- file_fd.close()
-
- self.cmd_wait = test_env_config_yaml.get("general").get("command_wait")
- self.ssh_connect_timeout = test_env_config_yaml.get("general").get(
- "ssh_connect_timeout")
- self.ssh_connect_retry_count = test_env_config_yaml.get("general").get(
- "ssh_connect_retry_count")
-
- def config_vnf(self, source_vnf, destination_vnf, test_cmd_file_path,
- parameter_file_path, prompt_file_path):
- # pylint: disable=too-many-arguments
- parameter_file = open(parameter_file_path,
- 'r')
- cmd_input_param = yaml.safe_load(parameter_file)
- parameter_file.close()
-
- cmd_input_param["macaddress"] = source_vnf["data_plane_network_mac"]
- cmd_input_param["source_ip"] = source_vnf["data_plane_network_ip"]
- cmd_input_param["destination_ip"] = destination_vnf[
- "data_plane_network_ip"]
-
- return self.vm_controller.config_vm(source_vnf,
- test_cmd_file_path,
- cmd_input_param,
- prompt_file_path)
-
- def result_check(self, target_vnf, reference_vnf,
- check_rule_file_path_list, parameter_file_path,
- prompt_file_path):
- # pylint: disable=too-many-arguments,too-many-locals
-
- res_dict_data_list = []
-
- parameter_file = open(parameter_file_path,
- 'r')
- cmd_input_param = yaml.safe_load(parameter_file)
- parameter_file.close()
-
- cmd_input_param["source_ip"] = target_vnf["data_plane_network_ip"]
- cmd_input_param["destination_ip"] = reference_vnf[
- "data_plane_network_ip"]
-
- prompt_file = open(prompt_file_path,
- 'r')
- prompt = yaml.safe_load(prompt_file)
- prompt_file.close()
- terminal_mode_prompt = prompt["terminal_mode"]
-
- ssh = SshClient(target_vnf["floating_ip"],
- target_vnf["user"],
- target_vnf["pass"])
-
- result = ssh.connect(self.ssh_connect_timeout,
- self.ssh_connect_retry_count)
- if not result:
- return False, res_dict_data_list
-
- checker = Checker()
-
- res_table = prettytable.PrettyTable(
- header_style='upper', padding_width=5,
- field_names=['test item', 'result'])
-
- status = True
- res_data_list = []
- for check_rule_file_path in check_rule_file_path_list:
- (check_rule_dir, check_rule_file) = os.path.split(
- check_rule_file_path)
- check_rules = checker.load_check_rule(check_rule_dir,
- check_rule_file,
- cmd_input_param)
- (res, res_data) = self.vm_controller.command_execute(
- ssh,
- check_rules["command"],
- terminal_mode_prompt)
- res_data_list.append(res_data)
- if not res:
- status = False
- break
-
- (res, res_dict_data) = checker.regexp_information(res_data,
- check_rules)
- res_dict_data_list.append(res_dict_data)
- res_table.add_row([res_dict_data["test_name"],
- res_dict_data["result"]])
- if not res:
- status = False
-
- time.sleep(self.cmd_wait)
-
- ssh.close()
-
- self.logger.info("Test result:\n\n%s\n", res_table.get_string())
-
- self.output_check_result_detail_data(res_data_list)
-
- return status, res_dict_data_list
-
- def output_check_result_detail_data(self, res_data_list):
- for res_data in res_data_list:
- self.logger.debug(res_data)
diff --git a/functest/opnfv_tests/vnf/router/vrouter_base.py b/functest/opnfv_tests/vnf/router/vrouter_base.py
deleted file mode 100644
index 8818032d..00000000
--- a/functest/opnfv_tests/vnf/router/vrouter_base.py
+++ /dev/null
@@ -1,122 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2017 Okinawa Open Laboratory and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-"""vrouter testing base class module"""
-
-import datetime
-import json
-import logging
-import os
-import time
-
-import pkg_resources
-
-import functest.core.vnf as vnf
-from functest.utils import config
-from functest.opnfv_tests.vnf.router.test_controller import function_test_exec
-from functest.opnfv_tests.vnf.router.utilvnf import Utilvnf
-
-__author__ = "Shuya Nakama <shuya.nakama@okinawaopenlabs.org>"
-
-REBOOT_WAIT = 30
-
-
-class VrouterOnBoardingBase(vnf.VnfOnBoarding):
- """vrouter testing base class"""
-
- def __init__(self, **kwargs):
- self.logger = logging.getLogger(__name__)
- super(VrouterOnBoardingBase, self).__init__(**kwargs)
- self.case_dir = pkg_resources.resource_filename(
- 'functest', 'opnfv_tests/vnf/router')
- self.data_dir = getattr(config.CONF, 'dir_router_data')
- self.result_dir = os.path.join(getattr(config.CONF, 'dir_results'),
- self.case_name)
- self.util = Utilvnf()
- self.util_info = {}
-
- self.vnf_list = []
-
- if not os.path.exists(self.data_dir):
- os.makedirs(self.data_dir)
- if not os.path.exists(self.result_dir):
- os.makedirs(self.result_dir)
-
- def test_vnf(self):
- """vrouter test execution"""
- result = False
- test_result_data_list = []
- test_scenario_file_path = os.path.join(self.case_dir,
- self.util.test_scenario_yaml)
- test_scenario_list = self.util.get_test_scenario(
- test_scenario_file_path)
- for test_scenario in test_scenario_list:
- if test_scenario["test_type"] == "function_test":
- function_test_list = test_scenario["function_test_list"]
- for function_test in function_test_list:
- test_list = function_test["test_list"]
- target_vnf_name = function_test["target_vnf_name"]
- for test_info in test_list:
- self.logger.info(test_info["protocol"] + " " +
- test_info["test_kind"] +
- " test.")
- (result, result_data) = self.function_test_vrouter(
- target_vnf_name, test_info)
- test_result_data_list.append(result_data)
- if not result:
- break
-
- self.util.request_vm_delete(self.vnf_list)
-
- test_result_data = json.dumps(test_result_data_list, indent=4)
-
- return result, test_result_data
-
- def function_test_vrouter(self, target_vnf_name, test_info):
- """function test execution"""
-
- test_protocol = test_info["protocol"]
- test_list = test_info[test_protocol]
-
- vnf_info_list = self.get_vnf_info_list(target_vnf_name)
- self.vnf_list = vnf_info_list
-
- self.logger.debug("request vnf's reboot.")
- self.util.request_vnf_reboot(vnf_info_list)
- time.sleep(REBOOT_WAIT)
-
- target_vnf = self.util.get_target_vnf(vnf_info_list)
-
- reference_vnf_list = self.util.get_reference_vnf_list(vnf_info_list)
-
- test_exec = function_test_exec.FunctionTestExec(self.util_info)
-
- # start test
- start_time_ts = time.time()
- self.logger.info("vRouter test Start Time:'%s'", (
- datetime.datetime.fromtimestamp(start_time_ts).strftime(
- '%Y-%m-%d %H:%M:%S')))
-
- (result, test_result_data) = test_exec.run(target_vnf,
- reference_vnf_list,
- test_info,
- test_list)
-
- end_time_ts = time.time()
- duration = round(end_time_ts - start_time_ts, 1)
- self.logger.info("vRouter test duration :'%s'", duration)
-
- return result, test_result_data
-
- def get_vnf_info_list(self, target_vnf_name):
- # pylint: disable=unused-argument,no-self-use
- vnf_info_list = []
- return vnf_info_list