1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
#!/usr/bin/env python
# Copyright (c) 2018 Enea AB and others
# This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
#
# http://www.apache.org/licenses/LICENSE-2.0
"""CinderCheck testcase."""
import logging
import pkg_resources
from scp import SCPClient
from xtesting.core import testcase
from functest.core import singlevm
from functest.utils import config
from functest.utils import env
class CinderCheck(singlevm.SingleVm2):
"""
CinderCheck testcase implementation.
Class to execute the CinderCheck test using 2 Floating IPs
to connect to the VMs and one data volume
"""
# pylint: disable=too-many-instance-attributes
volume_timeout = 60
def __init__(self, **kwargs):
"""Initialize testcase."""
if "case_name" not in kwargs:
kwargs["case_name"] = "cinder_test"
super(CinderCheck, self).__init__(**kwargs)
self.logger = logging.getLogger(__name__)
self.vm2 = None
self.fip2 = None
self.ssh2 = None
self.volume = None
def execute(self):
"""Execute CinderCheck testcase.
Sets up the OpenStack keypair, router, security group, and VM instance
objects then validates cinder.
:return: the exit code from the super.execute() method
"""
return self._write_data() or self._read_data()
def prepare(self):
super(CinderCheck, self).prepare()
self.vm2 = self.boot_vm(
'{}-vm2_{}'.format(self.case_name, self.guid),
key_name=self.keypair.id,
security_groups=[self.sec.id])
(self.fip2, self.ssh2) = self.connect(self.vm2)
self.volume = self.cloud.create_volume(
name='{}-volume_{}'.format(self.case_name, self.guid), size='2',
timeout=self.volume_timeout, wait=True)
def _write_data(self):
assert self.cloud
self.cloud.attach_volume(self.sshvm, self.volume,
timeout=self.volume_timeout)
write_data_script = pkg_resources.resource_filename(
'functest.opnfv_tests.openstack.cinder', 'write_data.sh')
try:
scp = SCPClient(self.ssh.get_transport())
scp.put(write_data_script, remote_path="~/")
except Exception: # pylint: disable=broad-except
self.logger.error("File not transfered!")
return testcase.TestCase.EX_RUN_ERROR
self.logger.debug("ssh: %s", self.ssh)
(_, stdout, stderr) = self.ssh.exec_command(
"sh ~/write_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
self.logger.debug(
"volume_write stdout: %s", stdout.read().decode("utf-8"))
self.logger.debug(
"volume_write stderr: %s", stderr.read().decode("utf-8"))
# Detach volume from VM 1
self.logger.info("Detach volume from VM 1")
self.cloud.detach_volume(
self.sshvm, self.volume, timeout=self.volume_timeout)
return stdout.channel.recv_exit_status()
def _read_data(self):
assert self.cloud
# Attach volume to VM 2
self.logger.info("Attach volume to VM 2")
self.cloud.attach_volume(self.vm2, self.volume,
timeout=self.volume_timeout)
# Check volume data
read_data_script = pkg_resources.resource_filename(
'functest.opnfv_tests.openstack.cinder', 'read_data.sh')
try:
scp = SCPClient(self.ssh2.get_transport())
scp.put(read_data_script, remote_path="~/")
except Exception: # pylint: disable=broad-except
self.logger.error("File not transfered!")
return testcase.TestCase.EX_RUN_ERROR
self.logger.debug("ssh: %s", self.ssh2)
(_, stdout, stderr) = self.ssh2.exec_command(
"sh ~/read_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
self.logger.debug(
"read volume stdout: %s", stdout.read().decode("utf-8"))
self.logger.debug(
"read volume stderr: %s", stderr.read().decode("utf-8"))
self.logger.info("Detach volume from VM 2")
self.cloud.detach_volume(
self.vm2, self.volume, timeout=self.volume_timeout)
return stdout.channel.recv_exit_status()
def clean(self):
assert self.cloud
if self.vm2:
self.cloud.delete_server(
self.vm2, wait=True,
timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
if self.fip2:
self.cloud.delete_floating_ip(self.fip2.id)
if self.volume:
self.cloud.delete_volume(self.volume.id)
super(CinderCheck, self).clean()
|