From f16df4d50c215571a5dc40d5976a292ad912f24b Mon Sep 17 00:00:00 2001 From: Mark Beierl Date: Fri, 2 Dec 2016 11:19:54 -0500 Subject: Change to Paramiko Use Paramiko ssh client instead of invoking ssh and scp from the command line Change-Id: Ibc8395b98842fd7f40b49c4dafa2688d8e64abc7 JIRA: STORPERF-91 Signed-off-by: Mark Beierl --- ci/verify.sh | 2 ++ docker/Dockerfile | 2 ++ docker/requirements.pip | 2 ++ storperf/carbon/emitter.py | 2 ++ storperf/fio/fio_invoker.py | 81 +++++++++++++++++++++--------------------- storperf/logging.json | 8 ++++- storperf/storperf_master.py | 85 +++++++++++++-------------------------------- 7 files changed, 82 insertions(+), 100 deletions(-) diff --git a/ci/verify.sh b/ci/verify.sh index e87c757..22d0186 100755 --- a/ci/verify.sh +++ b/ci/verify.sh @@ -32,6 +32,7 @@ pip install html2text==2016.1.8 pip install matplotlib==1.3.1 pip install mock==1.3.0 pip install nose==1.3.7 +pip install paramiko==2.0.2 pip install python-cinderclient==1.6.0 pip install python-glanceclient==1.1.0 pip install python-heatclient==0.8.0 @@ -40,6 +41,7 @@ pip install python-neutronclient==2.6.0 pip install python-novaclient==2.28.1 pip install pyyaml==3.10 pip install requests==2.9.1 +pip install scp==0.10.2 pip install six==1.10.0 python ci/setup.py develop diff --git a/docker/Dockerfile b/docker/Dockerfile index 449ad70..d8abf63 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -36,6 +36,8 @@ libaio-dev \ zlib1g-dev \ supervisor \ ssh \ +libssl-dev \ +libffi-dev \ rsync \ git \ wget \ diff --git a/docker/requirements.pip b/docker/requirements.pip index 8efaf24..69f4ab2 100644 --- a/docker/requirements.pip +++ b/docker/requirements.pip @@ -11,3 +11,5 @@ flask-restful==0.3.5 flask-restful-swagger==0.19 flask-swagger==0.2.12 html2text==2016.1.8 +paramiko=2.0.2 +scp==0.10.2 diff --git a/storperf/carbon/emitter.py b/storperf/carbon/emitter.py index 6104fd4..c9af8a6 100644 --- a/storperf/carbon/emitter.py +++ b/storperf/carbon/emitter.py @@ -35,3 +35,5 @@ class CarbonMetricTransmitter(): carbon_socket.send(message + '\n') carbon_socket.close() + self.logger.info("Sent metrics to carbon with timestamp %s" + % timestamp) diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py index 59dbdaf..315b243 100644 --- a/storperf/fio/fio_invoker.py +++ b/storperf/fio/fio_invoker.py @@ -11,6 +11,7 @@ import json import logging import subprocess from threading import Thread +import paramiko class FIOInvoker(object): @@ -37,10 +38,11 @@ class FIOInvoker(object): def unregister(self, event_listener): self.event_listeners.discard(event_listener) - def stdout_handler(self): + def stdout_handler(self, stdout): + self.logger.debug("Started") self.json_body = "" try: - for line in iter(self.fio_process.stdout.readline, b''): + for line in iter(stdout.readline, b''): if line.startswith("fio"): line = "" continue @@ -54,52 +56,53 @@ class FIOInvoker(object): try: event_listener(self.callback_id, json_metric) except Exception, e: - self.logger.error("Notifying listener %s: %s", - self.callback_id, e) + self.logger.exception( + "Notifying listener %s: %s", + self.callback_id, e) + self.logger.info( + "Event listener callback complete") except Exception, e: self.logger.error("Error parsing JSON: %s", e) except ValueError: pass # We might have read from the closed socket, ignore it - self.fio_process.stdout.close() + stdout.close() + self.logger.debug("Finished") - def stderr_handler(self): - for line in iter(self.fio_process.stderr.readline, b''): - self.logger.error("FIO Error: %s", line) + def stderr_handler(self, stderr): + self.logger.debug("Started") + for line in iter(stderr.readline, b''): + self.logger.error("FIO Error: %s", line.rstrip()) - self.fio_process.stderr.close() + stderr.close() + self.logger.debug("Finished") def execute(self, args=[]): - self.logger.debug("FIO args " + str(args)) - - if (self.remote_host is None): - cmd = "fio" - else: - cmd = "ssh" - additional_args = ['-o', 'StrictHostKeyChecking=no', - '-o', 'UserKnownHostsFile=/dev/null', - '-o', 'LogLevel=error', - '-i', 'storperf/resources/ssh/storperf_rsa', - 'storperf@' + self.remote_host, - "sudo", "./fio"] - args = additional_args + args - - self.fio_process = subprocess.Popen([cmd] + args, - universal_newlines=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - - t = Thread(target=self.stdout_handler, args=()) - t.daemon = True - t.start() - - t = Thread(target=self.stderr_handler, args=()) - t.daemon = True - t.start() - - self.logger.debug("Started fio on " + self.remote_host) - t.join() - self.logger.debug("Finished fio on " + self.remote_host) + + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(self.remote_host, username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) + + command = "sudo ./fio " + ' '.join(args) + self.logger.debug("Remote command: %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + + tout = Thread(target=self.stdout_handler, args=(stdout,), + name="%s stdout" % self._remote_host) + tout.daemon = True + tout.start() + + terr = Thread(target=self.stderr_handler, args=(stderr,), + name="%s stderr" % self._remote_host) + terr.daemon = True + terr.start() + + self.logger.info("Started fio on " + self.remote_host) + terr.join() + tout.join() + self.logger.info("Finished fio on " + self.remote_host) def terminate(self): self.logger.debug("Terminating fio on " + self.remote_host) diff --git a/storperf/logging.json b/storperf/logging.json index 6168717..6d6026e 100644 --- a/storperf/logging.json +++ b/storperf/logging.json @@ -3,7 +3,7 @@ "disable_existing_loggers": false, "formatters": { "simple": { - "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + "format": "%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s" } }, @@ -52,5 +52,11 @@ "storperf": { "level": "DEBUG", "handlers": ["console", "file_handler", "error_file_handler"] + }, + + "storperf.carbon.emitter": { + "level": "INFO", + "handlers": ["console", "file_handler", "error_file_handler"] } + } \ No newline at end of file diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py index 99df47f..35cba72 100644 --- a/storperf/storperf_master.py +++ b/storperf/storperf_master.py @@ -11,16 +11,18 @@ from datetime import datetime import logging import os import socket -import subprocess from threading import Thread from time import sleep +import paramiko +from scp import SCPClient + import cinderclient.v2 as cinderclient -from db.configuration_db import ConfigurationDB -from db.job_db import JobDB import heatclient.client as heatclient import keystoneclient.v2_0 as ksclient +from storperf.db.configuration_db import ConfigurationDB from storperf.db.graphite_db import GraphiteDB +from storperf.db.job_db import JobDB from test_executor import TestExecutor @@ -359,63 +361,26 @@ class StorPerfMaster(object): alive = True logger.debug("Slave " + slave + " is alive and ready") - args = ['scp', '-o', 'StrictHostKeyChecking=no', - '-o', 'UserKnownHostsFile=/dev/null', - '-o', 'LogLevel=error', - '-i', 'storperf/resources/ssh/storperf_rsa', - '/lib/x86_64-linux-gnu/libaio.so.1', - 'storperf@' + slave + ":"] - - logger.debug(args) - proc = subprocess.Popen(args, - universal_newlines=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - - (stdout, stderr) = proc.communicate() - if (len(stdout) > 0): - logger.debug(stdout.decode('utf-8').strip()) - if (len(stderr) > 0): - logger.error(stderr.decode('utf-8').strip()) - - args = ['scp', '-o', 'StrictHostKeyChecking=no', - '-o', 'UserKnownHostsFile=/dev/null', - '-o', 'LogLevel=error', - '-i', 'storperf/resources/ssh/storperf_rsa', - '/usr/local/bin/fio', - 'storperf@' + slave + ":"] - - logger.debug(args) - proc = subprocess.Popen(args, - universal_newlines=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - - (stdout, stderr) = proc.communicate() - if (len(stdout) > 0): - logger.debug(stdout.decode('utf-8').strip()) - if (len(stderr) > 0): - logger.error(stderr.decode('utf-8').strip()) - - args = ['ssh', '-o', 'StrictHostKeyChecking=no', - '-o', 'UserKnownHostsFile=/dev/null', - '-o', 'LogLevel=error', - '-i', 'storperf/resources/ssh/storperf_rsa', - 'storperf@' + slave, - 'sudo cp -v libaio.so.1 /lib/x86_64-linux-gnu/libaio.so.1' - ] - - logger.debug(args) - proc = subprocess.Popen(args, - universal_newlines=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - - (stdout, stderr) = proc.communicate() - if (len(stdout) > 0): - logger.debug(stdout.decode('utf-8').strip()) - if (len(stderr) > 0): - logger.error(stderr.decode('utf-8').strip()) + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(slave, username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) + + scp = SCPClient(ssh.get_transport()) + logger.debug("Transferring libaio.so.1 to %s" % slave) + scp.put('/lib/x86_64-linux-gnu/libaio.so.1', '~/') + logger.debug("Transferring fio to %s" % slave) + scp.put('/usr/local/bin/fio', '~/') + + cmd = 'sudo cp -v libaio.so.1 /lib/x86_64-linux-gnu/libaio.so.1' + logger.debug("Executing on %s: %s" % (slave, cmd)) + (_, stdout, stderr) = ssh.exec_command(cmd) + + for line in stdout.readlines(): + logger.debug(line.decode('utf-8').strip()) + for line in stderr.readlines(): + logger.error(line.decode('utf-8').strip()) def _make_parameters(self): heat_parameters = {} -- cgit 1.2.3-korg