1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
#!/usr/bin/env python
# Copyright (c) 2017 Okinawa Open Laboratory and others.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
"""ssh client module for vrouter testing"""
import logging
import paramiko
import time
import yaml
from functest.opnfv_tests.vnf.router.utilvnf import Utilvnf
RECEIVE_ROOP_WAIT = 1
DEFAULT_CONNECT_TIMEOUT = 10
DEFAULT_CONNECT_RETRY_COUNT = 10
DEFAULT_SEND_TIMEOUT = 10
class SshClient(object):
"""ssh client class for vrouter testing"""
logger = logging.getLogger(__name__)
def __init__(self, ip_address, user, password=None, key_filename=None):
self.ip_address = ip_address
self.user = user
self.password = password
self.key_filename = key_filename
self.connected = False
self.shell = None
self.logger.setLevel(logging.INFO)
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.util = Utilvnf()
with open(self.util.test_env_config_yaml) as file_fd:
test_env_config_yaml = yaml.safe_load(file_fd)
file_fd.close()
self.ssh_revieve_buff = test_env_config_yaml.get("general").get(
"ssh_receive_buffer")
def connect(self, time_out=DEFAULT_CONNECT_TIMEOUT,
retrycount=DEFAULT_CONNECT_RETRY_COUNT):
while retrycount > 0:
try:
self.logger.info("SSH connect to %s.", self.ip_address)
self.ssh.connect(self.ip_address,
username=self.user,
password=self.password,
key_filename=self.key_filename,
timeout=time_out,
look_for_keys=False,
allow_agent=False)
self.logger.info("SSH connection established to %s.",
self.ip_address)
self.shell = self.ssh.invoke_shell()
while not self.shell.recv_ready():
time.sleep(RECEIVE_ROOP_WAIT)
self.shell.recv(self.ssh_revieve_buff)
break
except: # pylint: disable=broad-except
self.logger.info("SSH timeout for %s...", self.ip_address)
time.sleep(time_out)
retrycount -= 1
if retrycount == 0:
self.logger.error("Cannot establish connection to IP '%s'. " +
"Aborting",
self.ip_address)
self.connected = False
return self.connected
self.connected = True
return self.connected
def send(self, cmd, prompt, timeout=DEFAULT_SEND_TIMEOUT):
if self.connected is True:
self.shell.settimeout(timeout)
self.logger.debug("Commandset : '%s'", cmd)
try:
self.shell.send(cmd + '\n')
except: # pylint: disable=broad-except
self.logger.error("ssh send timeout : Command : '%s'", cmd)
return None
res_buff = ''
while not res_buff.endswith(prompt):
time.sleep(RECEIVE_ROOP_WAIT)
try:
res = self.shell.recv(self.ssh_revieve_buff)
except: # pylint: disable=broad-except
self.logger.error("ssh receive timeout : Command : '%s'",
cmd)
break
res_buff += res
self.logger.debug("Response : '%s'", res_buff)
return res_buff
else:
self.logger.error("Cannot connected to IP '%s'.", self.ip_address)
return None
def close(self):
if self.connected is True:
self.ssh.close()
def error_check(response, err_strs=["error",
"warn",
"unknown command",
"already exist"]):
for err in err_strs:
if err in response:
return False
return True
|