diff options
Diffstat (limited to 'functest/opnfv_tests/openstack/cinder/cinder_test.py')
-rw-r--r-- | functest/opnfv_tests/openstack/cinder/cinder_test.py | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/functest/opnfv_tests/openstack/cinder/cinder_test.py b/functest/opnfv_tests/openstack/cinder/cinder_test.py new file mode 100644 index 000000000..7d8c0a0bd --- /dev/null +++ b/functest/opnfv_tests/openstack/cinder/cinder_test.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python + +# Copyright (c) 2018 Enea AB and others + +# This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# +# http://www.apache.org/licenses/LICENSE-2.0 + +"""CinderCheck testcase.""" + +import logging + +import pkg_resources +from scp import SCPClient +from xtesting.core import testcase + +from functest.core import singlevm +from functest.utils import config +from functest.utils import env + + +class CinderCheck(singlevm.SingleVm2): + """ + CinderCheck testcase implementation. + + Class to execute the CinderCheck test using 2 Floating IPs + to connect to the VMs and one data volume + """ + # pylint: disable=too-many-instance-attributes + volume_timeout = 60 + + def __init__(self, **kwargs): + """Initialize testcase.""" + if "case_name" not in kwargs: + kwargs["case_name"] = "cinder_test" + super().__init__(**kwargs) + self.logger = logging.getLogger(__name__) + self.vm2 = None + self.fip2 = None + self.ssh2 = None + self.volume = None + + def execute(self): + """Execute CinderCheck testcase. + + Sets up the OpenStack keypair, router, security group, and VM instance + objects then validates cinder. + :return: the exit code from the super.execute() method + """ + return self._write_data() or self._read_data() + + def prepare(self): + super().prepare() + self.vm2 = self.boot_vm( + f'{self.case_name}-vm2_{self.guid}', + key_name=self.keypair.id, + security_groups=[self.sec.id]) + (self.fip2, self.ssh2) = self.connect(self.vm2) + self.volume = self.cloud.create_volume( + name=f'{self.case_name}-volume_{self.guid}', size='2', + timeout=self.volume_timeout, wait=True) + + def _write_data(self): + assert self.cloud + self.cloud.attach_volume(self.sshvm, self.volume, + timeout=self.volume_timeout) + write_data_script = pkg_resources.resource_filename( + 'functest.opnfv_tests.openstack.cinder', 'write_data.sh') + try: + scp = SCPClient(self.ssh.get_transport()) + scp.put(write_data_script, remote_path="~/") + except Exception: # pylint: disable=broad-except + self.logger.error("File not transfered!") + return testcase.TestCase.EX_RUN_ERROR + self.logger.debug("ssh: %s", self.ssh) + (_, stdout, stderr) = self.ssh.exec_command( + f"sh ~/write_data.sh {env.get('VOLUME_DEVICE_NAME')}") + self.logger.debug( + "volume_write stdout: %s", stdout.read().decode("utf-8")) + self.logger.debug( + "volume_write stderr: %s", stderr.read().decode("utf-8")) + # Detach volume from VM 1 + self.logger.info("Detach volume from VM 1") + self.cloud.detach_volume( + self.sshvm, self.volume, timeout=self.volume_timeout) + return stdout.channel.recv_exit_status() + + def _read_data(self): + assert self.cloud + # Attach volume to VM 2 + self.logger.info("Attach volume to VM 2") + self.cloud.attach_volume(self.vm2, self.volume, + timeout=self.volume_timeout) + # Check volume data + read_data_script = pkg_resources.resource_filename( + 'functest.opnfv_tests.openstack.cinder', 'read_data.sh') + try: + scp = SCPClient(self.ssh2.get_transport()) + scp.put(read_data_script, remote_path="~/") + except Exception: # pylint: disable=broad-except + self.logger.error("File not transfered!") + return testcase.TestCase.EX_RUN_ERROR + self.logger.debug("ssh: %s", self.ssh2) + (_, stdout, stderr) = self.ssh2.exec_command( + f"sh ~/read_data.sh {env.get('VOLUME_DEVICE_NAME')}") + self.logger.debug( + "read volume stdout: %s", stdout.read().decode("utf-8")) + self.logger.debug( + "read volume stderr: %s", stderr.read().decode("utf-8")) + self.logger.info("Detach volume from VM 2") + self.cloud.detach_volume( + self.vm2, self.volume, timeout=self.volume_timeout) + return stdout.channel.recv_exit_status() + + def clean(self): + assert self.cloud + if self.vm2: + self.cloud.delete_server( + self.vm2, wait=True, + timeout=getattr(config.CONF, 'vping_vm_delete_timeout')) + if self.fip2: + self.cloud.delete_floating_ip(self.fip2.id) + if self.volume: + self.cloud.delete_volume(self.volume.id) + super().clean() |