diff options
Diffstat (limited to 'storperf/fio/fio_invoker.py')
-rw-r--r-- | storperf/fio/fio_invoker.py | 81 |
1 files changed, 42 insertions, 39 deletions
diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py index 59dbdaf..315b243 100644 --- a/storperf/fio/fio_invoker.py +++ b/storperf/fio/fio_invoker.py @@ -11,6 +11,7 @@ import json import logging import subprocess from threading import Thread +import paramiko class FIOInvoker(object): @@ -37,10 +38,11 @@ class FIOInvoker(object): def unregister(self, event_listener): self.event_listeners.discard(event_listener) - def stdout_handler(self): + def stdout_handler(self, stdout): + self.logger.debug("Started") self.json_body = "" try: - for line in iter(self.fio_process.stdout.readline, b''): + for line in iter(stdout.readline, b''): if line.startswith("fio"): line = "" continue @@ -54,52 +56,53 @@ class FIOInvoker(object): try: event_listener(self.callback_id, json_metric) except Exception, e: - self.logger.error("Notifying listener %s: %s", - self.callback_id, e) + self.logger.exception( + "Notifying listener %s: %s", + self.callback_id, e) + self.logger.info( + "Event listener callback complete") except Exception, e: self.logger.error("Error parsing JSON: %s", e) except ValueError: pass # We might have read from the closed socket, ignore it - self.fio_process.stdout.close() + stdout.close() + self.logger.debug("Finished") - def stderr_handler(self): - for line in iter(self.fio_process.stderr.readline, b''): - self.logger.error("FIO Error: %s", line) + def stderr_handler(self, stderr): + self.logger.debug("Started") + for line in iter(stderr.readline, b''): + self.logger.error("FIO Error: %s", line.rstrip()) - self.fio_process.stderr.close() + stderr.close() + self.logger.debug("Finished") def execute(self, args=[]): - self.logger.debug("FIO args " + str(args)) - - if (self.remote_host is None): - cmd = "fio" - else: - cmd = "ssh" - additional_args = ['-o', 'StrictHostKeyChecking=no', - '-o', 'UserKnownHostsFile=/dev/null', - '-o', 'LogLevel=error', - '-i', 'storperf/resources/ssh/storperf_rsa', - 'storperf@' + self.remote_host, - "sudo", "./fio"] - args = additional_args + args - - self.fio_process = subprocess.Popen([cmd] + args, - universal_newlines=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - - t = Thread(target=self.stdout_handler, args=()) - t.daemon = True - t.start() - - t = Thread(target=self.stderr_handler, args=()) - t.daemon = True - t.start() - - self.logger.debug("Started fio on " + self.remote_host) - t.join() - self.logger.debug("Finished fio on " + self.remote_host) + + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(self.remote_host, username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) + + command = "sudo ./fio " + ' '.join(args) + self.logger.debug("Remote command: %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + + tout = Thread(target=self.stdout_handler, args=(stdout,), + name="%s stdout" % self._remote_host) + tout.daemon = True + tout.start() + + terr = Thread(target=self.stderr_handler, args=(stderr,), + name="%s stderr" % self._remote_host) + terr.daemon = True + terr.start() + + self.logger.info("Started fio on " + self.remote_host) + terr.join() + tout.join() + self.logger.info("Finished fio on " + self.remote_host) def terminate(self): self.logger.debug("Terminating fio on " + self.remote_host) |