###############################################################################
# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
# and others #
# #
# All rights reserved. This program and the accompanying materials #
# are made available under the terms of the Apache License, Version 2.0 #
# which accompanies this distribution, and is available at #
# http://www.apache.org/licenses/LICENSE-2.0 #
###############################################################################
import os
import paramiko
from utils.binary_converter import BinaryConverter
from discover.scan_error import ScanError
class SshError(Exception):
pass
class SshConnection(BinaryConverter):
connections = {}
max_call_count_per_con = 100
timeout = 15 # timeout for exec in seconds
DEFAULT_PORT = 22
def __init__(self, _host: str, _user: str, _pwd: str=None, _key: str = None,
_port: int = None, _call_count_limit: int=None,
for_sftp: bool = False):
super().__init__()
self.host = _host
self.ssh_client = None
self.ftp = None
self.for_sftp = for_sftp
self.key = _key
self.port = _port
self.user = _user
self.pwd = _pwd
self.check_definitions()
self.fetched_host_details = False
self.call_count = 0
self.call_count_limit = 0 if for_sftp \
else (SshConnection.max_call_count_per_con
if _call_count_limit is None else _call_count_limit)
self.connections[self.get_connection_key(_host, for_sftp)] = self
def check_definitions(self):
if not self.host:
raise ValueError('Missing definition of host for CLI access')
if not self.user:
raise ValueError('Missing definition of user ' +
'for CLI access to host {}'.format(self.host))
if self.key and not os.path.exists(self.key):
raise ValueError('Key file not found: ' + self.key)
if not self.key and not self.pwd:
raise ValueError('Must specify key or password ' +
'for CLI access to host {}'.format(self.host))
@staticmethod
def get_ssh(host, _for_sftp=False):
return SshConnection.get_connection(host, for_sftp=_for_sftp)
@staticmethod
def get_connection_key(host, for_sftp=False):
key = ('sftp-' if for_sftp else '') + host
return key
@staticmethod
def get_connection(host, for_sftp=False):
key = SshConnection.get_connection_key(host, for_sftp)
return SshConnection.connections.get(key)
def disconnect(self):