summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xci/verify.sh2
-rw-r--r--docker/Dockerfile2
-rw-r--r--docker/requirements.pip2
-rw-r--r--storperf/carbon/emitter.py2
-rw-r--r--storperf/fio/fio_invoker.py81
-rw-r--r--storperf/logging.json8
-rw-r--r--storperf/storperf_master.py85
7 files changed, 82 insertions, 100 deletions
diff --git a/ci/verify.sh b/ci/verify.sh
index e87c757..22d0186 100755
--- a/ci/verify.sh
+++ b/ci/verify.sh
@@ -32,6 +32,7 @@ pip install html2text==2016.1.8
pip install matplotlib==1.3.1
pip install mock==1.3.0
pip install nose==1.3.7
+pip install paramiko==2.0.2
pip install python-cinderclient==1.6.0
pip install python-glanceclient==1.1.0
pip install python-heatclient==0.8.0
@@ -40,6 +41,7 @@ pip install python-neutronclient==2.6.0
pip install python-novaclient==2.28.1
pip install pyyaml==3.10
pip install requests==2.9.1
+pip install scp==0.10.2
pip install six==1.10.0
python ci/setup.py develop
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 449ad70..d8abf63 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -36,6 +36,8 @@ libaio-dev \
zlib1g-dev \
supervisor \
ssh \
+libssl-dev \
+libffi-dev \
rsync \
git \
wget \
diff --git a/docker/requirements.pip b/docker/requirements.pip
index 8efaf24..69f4ab2 100644
--- a/docker/requirements.pip
+++ b/docker/requirements.pip
@@ -11,3 +11,5 @@ flask-restful==0.3.5
flask-restful-swagger==0.19
flask-swagger==0.2.12
html2text==2016.1.8
+paramiko=2.0.2
+scp==0.10.2
diff --git a/storperf/carbon/emitter.py b/storperf/carbon/emitter.py
index 6104fd4..c9af8a6 100644
--- a/storperf/carbon/emitter.py
+++ b/storperf/carbon/emitter.py
@@ -35,3 +35,5 @@ class CarbonMetricTransmitter():
carbon_socket.send(message + '\n')
carbon_socket.close()
+ self.logger.info("Sent metrics to carbon with timestamp %s"
+ % timestamp)
diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py
index 59dbdaf..315b243 100644
--- a/storperf/fio/fio_invoker.py
+++ b/storperf/fio/fio_invoker.py
@@ -11,6 +11,7 @@ import json
import logging
import subprocess
from threading import Thread
+import paramiko
class FIOInvoker(object):
@@ -37,10 +38,11 @@ class FIOInvoker(object):
def unregister(self, event_listener):
self.event_listeners.discard(event_listener)
- def stdout_handler(self):
+ def stdout_handler(self, stdout):
+ self.logger.debug("Started")
self.json_body = ""
try:
- for line in iter(self.fio_process.stdout.readline, b''):
+ for line in iter(stdout.readline, b''):
if line.startswith("fio"):
line = ""
continue
@@ -54,52 +56,53 @@ class FIOInvoker(object):
try:
event_listener(self.callback_id, json_metric)
except Exception, e:
- self.logger.error("Notifying listener %s: %s",
- self.callback_id, e)
+ self.logger.exception(
+ "Notifying listener %s: %s",
+ self.callback_id, e)
+ self.logger.info(
+ "Event listener callback complete")
except Exception, e:
self.logger.error("Error parsing JSON: %s", e)
except ValueError:
pass # We might have read from the closed socket, ignore it
- self.fio_process.stdout.close()
+ stdout.close()
+ self.logger.debug("Finished")
- def stderr_handler(self):
- for line in iter(self.fio_process.stderr.readline, b''):
- self.logger.error("FIO Error: %s", line)
+ def stderr_handler(self, stderr):
+ self.logger.debug("Started")
+ for line in iter(stderr.readline, b''):
+ self.logger.error("FIO Error: %s", line.rstrip())
- self.fio_process.stderr.close()
+ stderr.close()
+ self.logger.debug("Finished")
def execute(self, args=[]):
- self.logger.debug("FIO args " + str(args))
-
- if (self.remote_host is None):
- cmd = "fio"
- else:
- cmd = "ssh"
- additional_args = ['-o', 'StrictHostKeyChecking=no',
- '-o', 'UserKnownHostsFile=/dev/null',
- '-o', 'LogLevel=error',
- '-i', 'storperf/resources/ssh/storperf_rsa',
- 'storperf@' + self.remote_host,
- "sudo", "./fio"]
- args = additional_args + args
-
- self.fio_process = subprocess.Popen([cmd] + args,
- universal_newlines=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
-
- t = Thread(target=self.stdout_handler, args=())
- t.daemon = True
- t.start()
-
- t = Thread(target=self.stderr_handler, args=())
- t.daemon = True
- t.start()
-
- self.logger.debug("Started fio on " + self.remote_host)
- t.join()
- self.logger.debug("Finished fio on " + self.remote_host)
+
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ ssh.connect(self.remote_host, username='storperf',
+ key_filename='storperf/resources/ssh/storperf_rsa',
+ timeout=2)
+
+ command = "sudo ./fio " + ' '.join(args)
+ self.logger.debug("Remote command: %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+
+ tout = Thread(target=self.stdout_handler, args=(stdout,),
+ name="%s stdout" % self._remote_host)
+ tout.daemon = True
+ tout.start()
+
+ terr = Thread(target=self.stderr_handler, args=(stderr,),
+ name="%s stderr" % self._remote_host)
+ terr.daemon = True
+ terr.start()
+
+ self.logger.info("Started fio on " + self.remote_host)
+ terr.join()
+ tout.join()
+ self.logger.info("Finished fio on " + self.remote_host)
def terminate(self):
self.logger.debug("Terminating fio on " + self.remote_host)
diff --git a/storperf/logging.json b/storperf/logging.json
index 6168717..6d6026e 100644
--- a/storperf/logging.json
+++ b/storperf/logging.json
@@ -3,7 +3,7 @@
"disable_existing_loggers": false,
"formatters": {
"simple": {
- "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+ "format": "%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s"
}
},
@@ -52,5 +52,11 @@
"storperf": {
"level": "DEBUG",
"handlers": ["console", "file_handler", "error_file_handler"]
+ },
+
+ "storperf.carbon.emitter": {
+ "level": "INFO",
+ "handlers": ["console", "file_handler", "error_file_handler"]
}
+
} \ No newline at end of file
diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py
index 99df47f..35cba72 100644
--- a/storperf/storperf_master.py
+++ b/storperf/storperf_master.py
@@ -11,16 +11,18 @@ from datetime import datetime
import logging
import os
import socket
-import subprocess
from threading import Thread
from time import sleep
+import paramiko
+from scp import SCPClient
+
import cinderclient.v2 as cinderclient
-from db.configuration_db import ConfigurationDB
-from db.job_db import JobDB
import heatclient.client as heatclient
import keystoneclient.v2_0 as ksclient
+from storperf.db.configuration_db import ConfigurationDB
from storperf.db.graphite_db import GraphiteDB
+from storperf.db.job_db import JobDB
from test_executor import TestExecutor
@@ -359,63 +361,26 @@ class StorPerfMaster(object):
alive = True
logger.debug("Slave " + slave + " is alive and ready")
- args = ['scp', '-o', 'StrictHostKeyChecking=no',
- '-o', 'UserKnownHostsFile=/dev/null',
- '-o', 'LogLevel=error',
- '-i', 'storperf/resources/ssh/storperf_rsa',
- '/lib/x86_64-linux-gnu/libaio.so.1',
- 'storperf@' + slave + ":"]
-
- logger.debug(args)
- proc = subprocess.Popen(args,
- universal_newlines=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
-
- (stdout, stderr) = proc.communicate()
- if (len(stdout) > 0):
- logger.debug(stdout.decode('utf-8').strip())
- if (len(stderr) > 0):
- logger.error(stderr.decode('utf-8').strip())
-
- args = ['scp', '-o', 'StrictHostKeyChecking=no',
- '-o', 'UserKnownHostsFile=/dev/null',
- '-o', 'LogLevel=error',
- '-i', 'storperf/resources/ssh/storperf_rsa',
- '/usr/local/bin/fio',
- 'storperf@' + slave + ":"]
-
- logger.debug(args)
- proc = subprocess.Popen(args,
- universal_newlines=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
-
- (stdout, stderr) = proc.communicate()
- if (len(stdout) > 0):
- logger.debug(stdout.decode('utf-8').strip())
- if (len(stderr) > 0):
- logger.error(stderr.decode('utf-8').strip())
-
- args = ['ssh', '-o', 'StrictHostKeyChecking=no',
- '-o', 'UserKnownHostsFile=/dev/null',
- '-o', 'LogLevel=error',
- '-i', 'storperf/resources/ssh/storperf_rsa',
- 'storperf@' + slave,
- 'sudo cp -v libaio.so.1 /lib/x86_64-linux-gnu/libaio.so.1'
- ]
-
- logger.debug(args)
- proc = subprocess.Popen(args,
- universal_newlines=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
-
- (stdout, stderr) = proc.communicate()
- if (len(stdout) > 0):
- logger.debug(stdout.decode('utf-8').strip())
- if (len(stderr) > 0):
- logger.error(stderr.decode('utf-8').strip())
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ ssh.connect(slave, username='storperf',
+ key_filename='storperf/resources/ssh/storperf_rsa',
+ timeout=2)
+
+ scp = SCPClient(ssh.get_transport())
+ logger.debug("Transferring libaio.so.1 to %s" % slave)
+ scp.put('/lib/x86_64-linux-gnu/libaio.so.1', '~/')
+ logger.debug("Transferring fio to %s" % slave)
+ scp.put('/usr/local/bin/fio', '~/')
+
+ cmd = 'sudo cp -v libaio.so.1 /lib/x86_64-linux-gnu/libaio.so.1'
+ logger.debug("Executing on %s: %s" % (slave, cmd))
+ (_, stdout, stderr) = ssh.exec_command(cmd)
+
+ for line in stdout.readlines():
+ logger.debug(line.decode('utf-8').strip())
+ for line in stderr.readlines():
+ logger.error(line.decode('utf-8').strip())
def _make_parameters(self):
heat_parameters = {}