summaryrefslogtreecommitdiffstats
path: root/docker/storperf-master
diff options
context:
space:
mode:
Diffstat (limited to 'docker/storperf-master')
-rw-r--r--docker/storperf-master/Dockerfile2
-rw-r--r--docker/storperf-master/rest_server.py8
-rw-r--r--docker/storperf-master/storperf/db/test_results_db.py5
-rw-r--r--docker/storperf-master/storperf/fio/fio_invoker.py29
-rw-r--r--docker/storperf-master/storperf/storperf_master.py75
-rw-r--r--docker/storperf-master/storperf/test_executor.py173
6 files changed, 191 insertions, 101 deletions
diff --git a/docker/storperf-master/Dockerfile b/docker/storperf-master/Dockerfile
index 38bd231..eaaf811 100644
--- a/docker/storperf-master/Dockerfile
+++ b/docker/storperf-master/Dockerfile
@@ -15,7 +15,7 @@
# $ docker build -t opnfv/storperf-master:tag .
#
-ARG ARCH
+ARG ARCH=x86_64
ARG ALPINE_VERSION=v3.6
FROM multiarch/alpine:$ARCH-$ALPINE_VERSION as storperf-builder
diff --git a/docker/storperf-master/rest_server.py b/docker/storperf-master/rest_server.py
index 6da2004..0634b8f 100644
--- a/docker/storperf-master/rest_server.py
+++ b/docker/storperf-master/rest_server.py
@@ -64,7 +64,9 @@ class ConfigurationRequestModel:
'agent_image': fields.String,
'public_network': fields.String,
'volume_size': fields.Integer,
- 'availability_zone': fields.String
+ 'availability_zone': fields.String,
+ 'username': fields.String,
+ 'password': fields.String
}
@@ -137,6 +139,10 @@ class Configure(Resource):
storperf.volume_size = request.json['volume_size']
if ('availability_zone' in request.json):
storperf.availabilty_zone = request.json['availability_zone']
+ if ('username' in request.json):
+ storperf.username = request.json['username']
+ if ('password' in request.json):
+ storperf.password = request.json['password']
storperf.create_stack()
if storperf.stack_id is None:
diff --git a/docker/storperf-master/storperf/db/test_results_db.py b/docker/storperf-master/storperf/db/test_results_db.py
index d6aabee..9c87e32 100644
--- a/docker/storperf-master/storperf/db/test_results_db.py
+++ b/docker/storperf-master/storperf/db/test_results_db.py
@@ -35,7 +35,6 @@ def push_results_to_db(db_url, details, logger):
logger.debug(r.content)
return json.loads(r.content)
except Exception:
- if logger:
- logger.exception("Error [push_results_to_db('%s', '%s', '%s')]:" %
- (db_url, params, details['details']))
+ logger.exception("Error [push_results_to_db('%s', '%s')]:" %
+ (db_url, params))
return None
diff --git a/docker/storperf-master/storperf/fio/fio_invoker.py b/docker/storperf-master/storperf/fio/fio_invoker.py
index 106696d..0360ea2 100644
--- a/docker/storperf-master/storperf/fio/fio_invoker.py
+++ b/docker/storperf-master/storperf/fio/fio_invoker.py
@@ -15,13 +15,14 @@ import paramiko
class FIOInvoker(object):
- def __init__(self):
+ def __init__(self, var_dict={}):
self.logger = logging.getLogger(__name__)
self.event_listeners = set()
self.event_callback_ids = set()
self._remote_host = None
self.callback_id = None
self.terminated = False
+ self.metadata = var_dict
@property
def remote_host(self):
@@ -90,11 +91,7 @@ class FIOInvoker(object):
self.logger.debug("Finished")
def execute(self, args=[]):
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.connect(self.remote_host, username='storperf',
- key_filename='storperf/resources/ssh/storperf_rsa',
- timeout=2)
+ ssh = self._ssh_client()
command = "sudo ./fio " + ' '.join(args)
self.logger.debug("Remote command: %s" % command)
@@ -133,11 +130,7 @@ class FIOInvoker(object):
self.logger.debug("Terminating fio on " + self.remote_host)
self.terminated = True
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.connect(self.remote_host, username='storperf',
- key_filename='storperf/resources/ssh/storperf_rsa',
- timeout=2)
+ ssh = self._ssh_client()
command = "sudo killall fio"
@@ -151,3 +144,17 @@ class FIOInvoker(object):
stdout.close()
stderr.close()
+
+ def _ssh_client(self):
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ if 'username' in self.metadata and 'password' in self.metadata:
+ ssh.connect(self.remote_host,
+ username=self.metadata['username'],
+ password=self.metadata['password'])
+ return ssh
+ else:
+ ssh.connect(self.remote_host, username='storperf',
+ key_filename='storperf/resources/ssh/storperf_rsa',
+ timeout=2)
+ return ssh
diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py
index 4e99e57..8a67048 100644
--- a/docker/storperf-master/storperf/storperf_master.py
+++ b/docker/storperf-master/storperf/storperf_master.py
@@ -257,6 +257,36 @@ class StorPerfMaster(object):
'workloads',
str(self._test_executor.workload_modules))
+ @property
+ def username(self):
+ return self.configuration_db.get_configuration_value(
+ 'stack',
+ 'username'
+ )
+
+ @username.setter
+ def username(self, value):
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'username',
+ value
+ )
+
+ @property
+ def password(self):
+ return self.configuration_db.get_configuration_value(
+ 'stack',
+ 'password'
+ )
+
+ @password.setter
+ def password(self, value):
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'password',
+ value
+ )
+
def get_logs(self, lines=None):
LOG_DIR = './storperf.log'
@@ -354,6 +384,9 @@ class StorPerfMaster(object):
params['agent_count'] = self.agent_count
params['public_network'] = self.public_network
params['volume_size'] = self.volume_size
+ if self.username and self.password:
+ params['username'] = self.username
+ params['password'] = self.password
job_id = self._test_executor.execute(params)
return job_id
@@ -424,14 +457,50 @@ class StorPerfMaster(object):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.connect(slave, username='storperf',
- key_filename='storperf/resources/ssh/storperf_rsa',
- timeout=2)
+ if self.username and self.password:
+ ssh.connect(slave,
+ username=self.username,
+ password=self.password)
+ else:
+ ssh.connect(slave, username='storperf',
+ key_filename='storperf/resources/ssh/storperf_rsa',
+ timeout=2)
+
+ available = self._check_root_fs(ssh)
+ logger.debug("Available space on / is %s" % available)
+ if available < 65536:
+ logger.warn("Root filesystem is too small, attemping resize")
+ self._resize_root_fs(ssh, logger)
+
+ available = self._check_root_fs(ssh)
+ logger.debug("Available space on / is now %s" % available)
+ if available < 65536:
+ logger.error("Cannot create enough space on /")
+ raise Exception("Root filesystem has only %s free" %
+ available)
scp = SCPClient(ssh.get_transport())
logger.debug("Transferring fio to %s" % slave)
scp.put('/usr/local/bin/fio', '~/')
+ def _check_root_fs(self, ssh):
+ (_, stdout, _) = ssh.exec_command("df /")
+ stdout.readline()
+ lines = stdout.readline().split()
+ if len(lines) > 4:
+ available = lines[3]
+ return int(available)
+
+ def _resize_root_fs(self, ssh, logger):
+ command = "sudo /usr/sbin/resize2fs /dev/vda1"
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+
def _make_parameters(self):
heat_parameters = {}
heat_parameters['public_network'] = self.public_network
diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py
index 3d1d9f2..4c2c972 100644
--- a/docker/storperf-master/storperf/test_executor.py
+++ b/docker/storperf-master/storperf/test_executor.py
@@ -235,25 +235,90 @@ class TestExecutor(object):
self.start_time = time.time()
self.workload_status = {}
- # Prepare stats list
- for workload_module in self.workload_modules:
- workload_name = getattr(workload_module, "__name__")
- blocksizes = self._block_sizes
- iodepths = self._queue_depths
- for blocksize in blocksizes:
- for iodepth in iodepths:
- name = '%s.%s.queue-depth.%s.block-size.%s' % \
- (self.job_db.job_id, workload_name, iodepth, blocksize)
- self.workload_status[name] = "Pending"
+
+ workloads = self._create_workload_matrix()
+
+ for current_workload in workloads:
+ workload = current_workload['workload']
+ self._thread_gate = ThreadGate(len(self.slaves),
+ workload.options['status-interval'])
+
+ if self._terminated:
+ return
+ self.current_workload = current_workload['name']
+
+ self.logger.info("Starting run %s" % self.current_workload)
+ self.workload_status[self.current_workload] = "Running"
+
+ scheduler = sched.scheduler(time.time, time.sleep)
+ if self.deadline is not None \
+ and not current_workload['workload_name'].startswith("_"):
+ event = scheduler.enter(self.deadline * 60, 1,
+ self.terminate_current_run,
+ ())
+ t = Thread(target=scheduler.run, args=())
+ t.start()
+
+ workload.options['iodepth'] = str(current_workload['queue-depth'])
+ workload.options['bs'] = str(current_workload['blocksize'])
+
+ slave_threads = []
+ for slave in self.slaves:
+ slave_workload = copy.copy(current_workload['workload'])
+ slave_workload.remote_host = slave
+
+ self._workload_executors.append(slave_workload)
+
+ t = Thread(target=self.execute_on_node,
+ args=(slave_workload,),
+ name="%s worker" % slave)
+ t.daemon = False
+ t.start()
+ slave_threads.append(t)
+
+ for slave_thread in slave_threads:
+ self.logger.debug("Waiting on %s" % slave_thread)
+ slave_thread.join()
+ self.logger.debug("Done waiting for %s" % slave_thread)
+
+ if not scheduler.empty():
+ try:
+ scheduler.cancel(event)
+ except ValueError:
+ pass
+
+ self.logger.info("Completed run %s"
+ % self.current_workload)
+ self.workload_status[self.current_workload] = "Completed"
+ self._workload_executors = []
+ self.current_workload = None
+
+ self.logger.info("Completed job %s" % (self.job_db.job_id))
+
+ self.end_time = time.time()
+ self._terminated = True
+ self.broadcast_event()
+ self.unregister(data_handler.data_event)
+ report = {'report': json.dumps(self.metadata)}
+ self.job_db.record_workload_params(report)
+ self.job_db.job_id = None
+ if self.result_url is not None:
+ self.logger.info("Results can be found at %s" % self.result_url)
+
+ def _create_workload_matrix(self):
+ workloads = []
for workload_module in self.workload_modules:
workload_name = getattr(workload_module, "__name__")
- self.logger.info("Starting workload %s" % (workload_name))
constructorMethod = getattr(workload_module, workload_name)
workload = constructorMethod()
if (self.filename is not None):
workload.filename = self.filename
+ workload.id = self.job_db.job_id
+
+ if (self.filename is not None):
+ workload.filename = self.filename
if (workload_name.startswith("_")):
iodepths = [8, ]
@@ -262,85 +327,29 @@ class TestExecutor(object):
iodepths = self._queue_depths
blocksizes = self._block_sizes
- workload.id = self.job_db.job_id
- self._thread_gate = ThreadGate(len(self.slaves),
- workload.options['status-interval'])
-
for blocksize in blocksizes:
for iodepth in iodepths:
- if self._terminated:
- return
- self.current_workload = (
- "%s.%s.queue-depth.%s.block-size.%s"
- % (self.job_db.job_id,
- workload_name,
- iodepth,
- blocksize))
-
- self.logger.info("Starting run %s" % self.current_workload)
- self.workload_status[self.current_workload] = "Running"
-
- scheduler = sched.scheduler(time.time, time.sleep)
- if self.deadline is not None \
- and not workload_name.startswith("_"):
- event = scheduler.enter(self.deadline * 60, 1,
- self.terminate_current_run,
- ())
- t = Thread(target=scheduler.run, args=())
- t.start()
-
- workload.options['iodepth'] = str(iodepth)
- workload.options['bs'] = str(blocksize)
-
- slave_threads = []
- for slave in self.slaves:
- slave_workload = copy.copy(workload)
- slave_workload.remote_host = slave
-
- self._workload_executors.append(slave_workload)
-
- t = Thread(target=self.execute_on_node,
- args=(slave_workload,),
- name="%s worker" % slave)
- t.daemon = False
- t.start()
- slave_threads.append(t)
-
- for slave_thread in slave_threads:
- self.logger.debug("Waiting on %s" % slave_thread)
- slave_thread.join()
- self.logger.debug("Done waiting for %s" % slave_thread)
-
- if not scheduler.empty():
- try:
- scheduler.cancel(event)
- except ValueError:
- pass
-
- self.logger.info("Completed run %s"
- % self.current_workload)
- self.workload_status[self.current_workload] = "Completed"
- self._workload_executors = []
- self.current_workload = None
-
- self.logger.info("Completed workload %s" % (workload_name))
- self.logger.info("Completed job %s" % (self.job_db.job_id))
+ name = '%s.%s.queue-depth.%s.block-size.%s' % \
+ (self.job_db.job_id, workload_name, iodepth, blocksize)
+ self.workload_status[name] = "Pending"
- if self.result_url is not None:
- self.logger.info("Results can be found at %s" % self.result_url)
+ parameters = {'queue-depth': iodepth,
+ 'blocksize': blocksize,
+ 'name': name,
+ 'workload_name': workload_name,
+ 'status': 'Pending',
+ 'workload': workload}
- self.end_time = time.time()
- self._terminated = True
- self.broadcast_event()
- self.unregister(data_handler.data_event)
- report = {'report': json.dumps(self.metadata)}
- self.job_db.record_workload_params(report)
- self.job_db.job_id = None
+ self.logger.info("Workload %s=%s" % (name, parameters))
+
+ workloads.append(parameters)
+
+ return workloads
def execute_on_node(self, workload):
- invoker = FIOInvoker()
+ invoker = FIOInvoker(self.metadata)
invoker.register(self.event)
workload.invoker = invoker