aboutsummaryrefslogtreecommitdiffstats
path: root/functest/opnfv_tests/openstack/cinder/cinder_test.py
blob: d81bb100af32de04835090cffff9180581ca3a86 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python

# Copyright (c) 2018 Enea AB and others

# This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
#
# http://www.apache.org/licenses/LICENSE-2.0

"""CinderCheck testcase."""

import logging

import pkg_resources
from scp import SCPClient
from xtesting.core import testcase

from functest.core import singlevm
from functest.utils import config
from functest.utils import env


class CinderCheck(singlevm.SingleVm2):
    """
    CinderCheck testcase implementation.

    Class to execute the CinderCheck test using 2 Floating IPs
    to connect to the VMs and one data volume
    """
    # pylint: disable=too-many-instance-attributes
    volume_timeout = 60

    def __init__(self, **kwargs):
        """Initialize testcase."""
        if "case_name" not in kwargs:
            kwargs["case_name"] = "cinder_test"
        super(CinderCheck, self).__init__(**kwargs)
        self.logger = logging.getLogger(__name__)
        self.vm2 = None
        self.fip2 = None
        self.ssh2 = None
        self.volume = None

    def execute(self):
        """Execute CinderCheck testcase.

        Sets up the OpenStack keypair, router, security group, and VM instance
        objects then validates cinder.
        :return: the exit code from the super.execute() method
        """
        return self._write_data() or self._read_data()

    def prepare(self):
        super(CinderCheck, self).prepare()
        self.vm2 = self.boot_vm(
            '{}-vm2_{}'.format(self.case_name, self.guid),
            key_name=self.keypair.id,
            security_groups=[self.sec.id])
        (self.fip2, self.ssh2) = self.connect(self.vm2)
        self.volume = self.cloud.create_volume(
            name='{}-volume_{}'.format(self.case_name, self.guid), size='2',
            timeout=self.volume_timeout, wait=True)

    def _write_data(self):
        assert self.cloud
        self.cloud.attach_volume(self.sshvm, self.volume,
                                 timeout=self.volume_timeout)
        write_data_script = pkg_resources.resource_filename(
            'functest.opnfv_tests.openstack.cinder', 'write_data.sh')
        try:
            scp = SCPClient(self.ssh.get_transport())
            scp.put(write_data_script, remote_path="~/")
        except Exception:  # pylint: disable=broad-except
            self.logger.error("File not transfered!")
            return testcase.TestCase.EX_RUN_ERROR
        self.logger.debug("ssh: %s", self.ssh)
        (_, stdout, stderr) = self.ssh.exec_command(
            "sh ~/write_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
        self.logger.debug(
            "volume_write stdout: %s", stdout.read().decode("utf-8"))
        self.logger.debug(
            "volume_write stderr: %s", stderr.read().decode("utf-8"))
        # Detach volume from VM 1
        self.logger.info("Detach volume from VM 1")
        self.cloud.detach_volume(
            self.sshvm, self.volume, timeout=self.volume_timeout)
        return stdout.channel.recv_exit_status()

    def _read_data(self):
        assert self.cloud
        # Attach volume to VM 2
        self.logger.info("Attach volume to VM 2")
        self.cloud.attach_volume(self.vm2, self.volume,
                                 timeout=self.volume_timeout)
        # Check volume data
        read_data_script = pkg_resources.resource_filename(
            'functest.opnfv_tests.openstack.cinder', 'read_data.sh')
        try:
            scp = SCPClient(self.ssh2.get_transport())
            scp.put(read_data_script, remote_path="~/")
        except Exception:  # pylint: disable=broad-except
            self.logger.error("File not transfered!")
            return testcase.TestCase.EX_RUN_ERROR
        self.logger.debug("ssh: %s", self.ssh2)
        (_, stdout, stderr) = self.ssh2.exec_command(
            "sh ~/read_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
        self.logger.debug(
            "read volume stdout: %s", stdout.read().decode("utf-8"))
        self.logger.debug(
            "read volume stderr: %s", stderr.read().decode("utf-8"))
        self.logger.info("Detach volume from VM 2")
        self.cloud.detach_volume(
            self.vm2, self.volume, timeout=self.volume_timeout)
        return stdout.channel.recv_exit_status()

    def clean(self):
        assert self.cloud
        if self.vm2:
            self.cloud.delete_server(
                self.vm2, wait=True,
                timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
        if self.fip2:
            self.cloud.delete_floating_ip(self.fip2.id)
        if self.volume:
            self.cloud.delete_volume(self.volume.id)
        super(CinderCheck, self).clean()