summaryrefslogtreecommitdiffstats
path: root/storperf/fio/fio_invoker.py
diff options
context:
space:
mode:
Diffstat (limited to 'storperf/fio/fio_invoker.py')
-rw-r--r--storperf/fio/fio_invoker.py81
1 files changed, 42 insertions, 39 deletions
diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py
index 59dbdaf..315b243 100644
--- a/storperf/fio/fio_invoker.py
+++ b/storperf/fio/fio_invoker.py
@@ -11,6 +11,7 @@ import json
import logging
import subprocess
from threading import Thread
+import paramiko
class FIOInvoker(object):
@@ -37,10 +38,11 @@ class FIOInvoker(object):
def unregister(self, event_listener):
self.event_listeners.discard(event_listener)
- def stdout_handler(self):
+ def stdout_handler(self, stdout):
+ self.logger.debug("Started")
self.json_body = ""
try:
- for line in iter(self.fio_process.stdout.readline, b''):
+ for line in iter(stdout.readline, b''):
if line.startswith("fio"):
line = ""
continue
@@ -54,52 +56,53 @@ class FIOInvoker(object):
try:
event_listener(self.callback_id, json_metric)
except Exception, e:
- self.logger.error("Notifying listener %s: %s",
- self.callback_id, e)
+ self.logger.exception(
+ "Notifying listener %s: %s",
+ self.callback_id, e)
+ self.logger.info(
+ "Event listener callback complete")
except Exception, e:
self.logger.error("Error parsing JSON: %s", e)
except ValueError:
pass # We might have read from the closed socket, ignore it
- self.fio_process.stdout.close()
+ stdout.close()
+ self.logger.debug("Finished")
- def stderr_handler(self):
- for line in iter(self.fio_process.stderr.readline, b''):
- self.logger.error("FIO Error: %s", line)
+ def stderr_handler(self, stderr):
+ self.logger.debug("Started")
+ for line in iter(stderr.readline, b''):
+ self.logger.error("FIO Error: %s", line.rstrip())
- self.fio_process.stderr.close()
+ stderr.close()
+ self.logger.debug("Finished")
def execute(self, args=[]):
- self.logger.debug("FIO args " + str(args))
-
- if (self.remote_host is None):
- cmd = "fio"
- else:
- cmd = "ssh"
- additional_args = ['-o', 'StrictHostKeyChecking=no',
- '-o', 'UserKnownHostsFile=/dev/null',
- '-o', 'LogLevel=error',
- '-i', 'storperf/resources/ssh/storperf_rsa',
- 'storperf@' + self.remote_host,
- "sudo", "./fio"]
- args = additional_args + args
-
- self.fio_process = subprocess.Popen([cmd] + args,
- universal_newlines=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
-
- t = Thread(target=self.stdout_handler, args=())
- t.daemon = True
- t.start()
-
- t = Thread(target=self.stderr_handler, args=())
- t.daemon = True
- t.start()
-
- self.logger.debug("Started fio on " + self.remote_host)
- t.join()
- self.logger.debug("Finished fio on " + self.remote_host)
+
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ ssh.connect(self.remote_host, username='storperf',
+ key_filename='storperf/resources/ssh/storperf_rsa',
+ timeout=2)
+
+ command = "sudo ./fio " + ' '.join(args)
+ self.logger.debug("Remote command: %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+
+ tout = Thread(target=self.stdout_handler, args=(stdout,),
+ name="%s stdout" % self._remote_host)
+ tout.daemon = True
+ tout.start()
+
+ terr = Thread(target=self.stderr_handler, args=(stderr,),
+ name="%s stderr" % self._remote_host)
+ terr.daemon = True
+ terr.start()
+
+ self.logger.info("Started fio on " + self.remote_host)
+ terr.join()
+ tout.join()
+ self.logger.info("Finished fio on " + self.remote_host)
def terminate(self):
self.logger.debug("Terminating fio on " + self.remote_host)