diff options
Diffstat (limited to 'docker/storperf-master/storperf/fio/fio_invoker.py')
-rw-r--r-- | docker/storperf-master/storperf/fio/fio_invoker.py | 153 |
1 files changed, 153 insertions, 0 deletions
diff --git a/docker/storperf-master/storperf/fio/fio_invoker.py b/docker/storperf-master/storperf/fio/fio_invoker.py new file mode 100644 index 0000000..106696d --- /dev/null +++ b/docker/storperf-master/storperf/fio/fio_invoker.py @@ -0,0 +1,153 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import json +import logging +from threading import Thread +import paramiko + + +class FIOInvoker(object): + + def __init__(self): + self.logger = logging.getLogger(__name__) + self.event_listeners = set() + self.event_callback_ids = set() + self._remote_host = None + self.callback_id = None + self.terminated = False + + @property + def remote_host(self): + return self._remote_host + + @remote_host.setter + def remote_host(self, value): + self._remote_host = value + self.logger = logging.getLogger(__name__ + ":" + value) + + def register(self, event_listener): + self.event_listeners.add(event_listener) + + def unregister(self, event_listener): + self.event_listeners.discard(event_listener) + + def stdout_handler(self, stdout): + self.logger.debug("Started") + self.json_body = "" + try: + for line in iter(stdout.readline, b''): + if line.startswith("fio"): + line = "" + continue + self.json_body += line + try: + if line == "}\n": + json_metric = json.loads(self.json_body) + self.json_body = "" + + if not self.terminated: + for event_listener in self.event_listeners: + try: + self.logger.debug( + "Event listener callback") + event_listener( + self.callback_id, json_metric) + except Exception, e: + self.logger.exception( + "Notifying listener %s: %s", + self.callback_id, e) + self.logger.debug( + "Event listener callback complete") + except Exception, e: + self.logger.error("Error parsing JSON: %s", e) + except IOError: + pass # We might have read from the closed socket, ignore it + + stdout.close() + self.logger.debug("Finished") + + def stderr_handler(self, stderr): + self.logger.debug("Started") + for line in iter(stderr.readline, b''): + self.logger.error("FIO Error: %s", line.rstrip()) + + # Sometime, FIO gets stuck and will give us this message: + # fio: job 'sequential_read' hasn't exited in 60 seconds, + # it appears to be stuck. Doing forceful exit of this job. + # A second killall of fio will release it stuck process. + + if 'it appears to be stuck' in line: + self.terminate() + + stderr.close() + self.logger.debug("Finished") + + def execute(self, args=[]): + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(self.remote_host, username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) + + command = "sudo ./fio " + ' '.join(args) + self.logger.debug("Remote command: %s" % command) + + chan = ssh.get_transport().open_session(timeout=None) + chan.settimeout(None) + chan.exec_command(command) + stdout = chan.makefile('r', -1) + stderr = chan.makefile_stderr('r', -1) + + tout = Thread(target=self.stdout_handler, args=(stdout,), + name="%s stdout" % self._remote_host) + tout.daemon = True + tout.start() + + terr = Thread(target=self.stderr_handler, args=(stderr,), + name="%s stderr" % self._remote_host) + terr.daemon = True + terr.start() + + self.logger.info("Started fio on " + self.remote_host) + exit_status = chan.recv_exit_status() + self.logger.info("Finished fio on %s with exit code %s" % + (self.remote_host, exit_status)) + + stdout.close() + stderr.close() + + self.logger.debug("Joining stderr handler") + terr.join() + self.logger.debug("Joining stdout handler") + tout.join() + self.logger.debug("Ended") + + def terminate(self): + self.logger.debug("Terminating fio on " + self.remote_host) + self.terminated = True + + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(self.remote_host, username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) + + command = "sudo killall fio" + + self.logger.debug("Executing on %s: %s" % (self.remote_host, command)) + (_, stdout, stderr) = ssh.exec_command(command) + + for line in stdout.readlines(): + self.logger.debug(line.strip()) + for line in stderr.readlines(): + self.logger.error(line.strip()) + + stdout.close() + stderr.close() |