summaryrefslogtreecommitdiffstats
path: root/docker/storperf-master
diff options
context:
space:
mode:
Diffstat (limited to 'docker/storperf-master')
-rw-r--r--docker/storperf-master/rest_server.py114
-rw-r--r--docker/storperf-master/storperf/fio/fio_invoker.py5
-rw-r--r--docker/storperf-master/storperf/storperf_master.py175
-rw-r--r--docker/storperf-master/storperf/test_executor.py18
-rw-r--r--docker/storperf-master/storperf/utilities/data_handler.py4
-rw-r--r--docker/storperf-master/storperf/workloads/_base_workload.py17
-rw-r--r--docker/storperf-master/storperf/workloads/_custom_workload.py2
7 files changed, 279 insertions, 56 deletions
diff --git a/docker/storperf-master/rest_server.py b/docker/storperf-master/rest_server.py
index 92b6c85..ce3a41c 100644
--- a/docker/storperf-master/rest_server.py
+++ b/docker/storperf-master/rest_server.py
@@ -379,11 +379,13 @@ the last stack named.
if not request.json:
abort(400, "ERROR: Missing configuration data")
+ storperf.reset_values()
self.logger.info(request.json)
try:
if ('stack_name' in request.json):
storperf.stack_name = request.json['stack_name']
+ storperf.stackless = False
if ('target' in request.json):
storperf.filename = request.json['target']
if ('deadline' in request.json):
@@ -439,7 +441,7 @@ class WorkloadsBodyModel:
@swagger.model
@swagger.nested(
- name=WorkloadsBodyModel.__name__)
+ name=WorkloadsBodyModel.__name__)
class WorkloadsNameModel:
resource_fields = {
"name": fields.Nested(WorkloadsBodyModel.resource_fields)
@@ -448,7 +450,7 @@ class WorkloadsNameModel:
@swagger.model
@swagger.nested(
- workloads=WorkloadsNameModel.__name__)
+ workloads=WorkloadsNameModel.__name__)
class WorkloadV2Model:
resource_fields = {
'target': fields.String,
@@ -457,7 +459,11 @@ class WorkloadV2Model:
'workloads': fields.Nested(WorkloadsNameModel.resource_fields),
'queue_depths': fields.String,
'block_sizes': fields.String,
- 'stack_name': fields.String
+ 'stack_name': fields.String,
+ 'username': fields.String,
+ 'password': fields.String,
+ 'ssh_private_key': fields.String,
+ 'slave_addresses': fields.List
}
required = ['workloads']
@@ -484,7 +490,19 @@ for any single test iteration.
"workloads": A JSON formatted map of workload names and parameters for FIO.
"stack_name": The target stack to use. Defaults to StorPerfAgentGroup, or
-the last stack named.
+the last stack named. Explicitly specifying null will bypass all Heat Stack
+operations and go directly against the IP addresses specified.
+
+"username": if specified, the username to use when logging into the slave.
+
+"password": if specified, the password to use when logging into the slave.
+
+"ssh_private_key": if specified, the ssh private key to use when logging
+into the slave.
+
+"slave_addresses": if specified, a list of IP addresses to use instead of
+looking all of them up from the stack.
+
""",
"required": True,
"type": "WorkloadV2Model",
@@ -505,9 +523,10 @@ the last stack named.
)
def post(self):
if not request.json:
- abort(400, "ERROR: Missing configuration data")
+ abort(400, "ERROR: Missing job data")
self.logger.info(request.json)
+ storperf.reset_values()
try:
if ('stack_name' in request.json):
@@ -534,6 +553,15 @@ the last stack named.
else:
metadata = {}
+ if 'username' in request.json:
+ storperf.username = request.json['username']
+ if 'password' in request.json:
+ storperf.password = request.json['password']
+ if 'ssh_private_key' in request.json:
+ storperf.ssh_key = request.json['ssh_private_key']
+ if 'slave_addresses' in request.json:
+ storperf.slave_addresses = request.json['slave_addresses']
+
job_id = storperf.execute_workloads(metadata)
return jsonify({'job_id': job_id})
@@ -547,7 +575,15 @@ the last stack named.
class WarmUpModel:
resource_fields = {
'stack_name': fields.String,
- 'target': fields.String
+ 'target': fields.String,
+ 'username': fields.String,
+ 'password': fields.String,
+ 'ssh_private_key': fields.String,
+ 'slave_addresses': fields.List,
+ 'mkfs': fields.String,
+ 'mount_point': fields.String,
+ 'file_size': fields.String,
+ 'file_count': fields.String
}
@@ -565,10 +601,35 @@ class Initialize(Resource):
"description": """Fill the target with random data. If no
target is specified, it will default to /dev/vdb
-"target": The target device or file to fill with random data.
+"target": The target device to use.
"stack_name": The target stack to use. Defaults to StorPerfAgentGroup, or
-the last stack named.
+the last stack named. Explicitly specifying null will bypass all Heat Stack
+operations and go directly against the IP addresses specified.
+
+"username": if specified, the username to use when logging into the slave.
+
+"password": if specified, the password to use when logging into the slave.
+
+"ssh_private_key": if specified, the ssh private key to use when logging
+into the slave.
+
+"slave_addresses": if specified, a list of IP addresses to use instead of
+looking all of them up from the stack.
+
+"mkfs": if specified, the command to execute in order to create a filesystem
+on the target device (eg: mkfs.ext4)
+
+"mount_point": if specified, the directory to use when mounting the device.
+
+"filesize": if specified, the size of the files to create when profiling
+a filesystem.
+
+"nrfiles": if specified, the number of files to create when profiling
+a filesystem
+
+"numjobs": if specified, the number of jobs for when profiling
+a filesystem
""",
"required": False,
"type": "WarmUpModel",
@@ -593,17 +654,46 @@ the last stack named.
)
def post(self):
self.logger.info(request.json)
+ storperf.reset_values()
try:
+ warm_up_args = {
+ 'rw': 'randwrite',
+ 'direct': "1",
+ 'loops': "1"
+ }
+ storperf.queue_depths = "8"
+ storperf.block_sizes = "16k"
+
if request.json:
if 'target' in request.json:
storperf.filename = request.json['target']
if 'stack_name' in request.json:
storperf.stack_name = request.json['stack_name']
- storperf.queue_depths = "8"
- storperf.block_sizes = "16k"
- storperf.workloads = "_warm_up"
- storperf.custom_workloads = None
+ if 'username' in request.json:
+ storperf.username = request.json['username']
+ if 'password' in request.json:
+ storperf.password = request.json['password']
+ if 'ssh_private_key' in request.json:
+ storperf.ssh_key = request.json['ssh_private_key']
+ if 'slave_addresses' in request.json:
+ storperf.slave_addresses = request.json['slave_addresses']
+ if 'mkfs' in request.json:
+ storperf.mkfs = request.json['mkfs']
+ if 'mount_device' in request.json:
+ storperf.mount_device = request.json['mount_device']
+ if 'filesize' in request.json:
+ warm_up_args['filesize'] = str(request.json['filesize'])
+ if 'nrfiles' in request.json:
+ warm_up_args['nrfiles'] = str(request.json['nrfiles'])
+ if 'numjobs' in request.json:
+ warm_up_args['numjobs'] = str(request.json['numjobs'])
+
+ storperf.workloads = None
+ storperf.custom_workloads = {
+ '_warm_up': warm_up_args
+ }
+ self.logger.info(storperf.custom_workloads)
job_id = storperf.execute_workloads()
return jsonify({'job_id': job_id})
diff --git a/docker/storperf-master/storperf/fio/fio_invoker.py b/docker/storperf-master/storperf/fio/fio_invoker.py
index a361eec..c665598 100644
--- a/docker/storperf-master/storperf/fio/fio_invoker.py
+++ b/docker/storperf-master/storperf/fio/fio_invoker.py
@@ -158,6 +158,11 @@ class FIOInvoker(object):
username=self.metadata['username'],
password=self.metadata['password'])
return ssh
+ elif 'username' in self.metadata and 'ssh_key' in self.metadata:
+ ssh.connect(self.remote_host,
+ username=self.metadata['username'],
+ pkey=self.metadata['ssh_key'])
+ return ssh
else:
ssh.connect(self.remote_host, username='storperf',
key_filename='storperf/resources/ssh/storperf_rsa',
diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py
index 0c7e559..76c4807 100644
--- a/docker/storperf-master/storperf/storperf_master.py
+++ b/docker/storperf-master/storperf/storperf_master.py
@@ -7,25 +7,26 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+import StringIO
from datetime import datetime
+import json
import logging
+from multiprocessing.pool import ThreadPool
import os
import socket
-from threading import Thread
from time import sleep
+import uuid
import paramiko
from scp import SCPClient
-
from snaps.config.stack import StackConfig
from snaps.openstack.create_stack import OpenStackHeatStack
from snaps.openstack.os_credentials import OSCreds
from snaps.openstack.utils import heat_utils, cinder_utils, glance_utils
from snaps.thread_utils import worker_pool
+
from storperf.db.job_db import JobDB
from storperf.test_executor import TestExecutor
-import json
-import uuid
class ParameterError(Exception):
@@ -37,8 +38,9 @@ class StorPerfMaster(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
+ self.reset_values()
+
self.job_db = JobDB()
- self._stack_name = 'StorPerfAgentGroup'
self.stack_settings = StackConfig(
name=self.stack_name,
template_path='storperf/resources/hot/agent-group.yaml')
@@ -59,21 +61,26 @@ class StorPerfMaster(object):
self.heat_stack = OpenStackHeatStack(self.os_creds,
self.stack_settings)
+
+ self._cached_stack_id = None
+ self._last_snaps_check_time = None
+ self._snaps_pool = worker_pool(20)
+
+ def reset_values(self):
+ self._stack_name = 'StorPerfAgentGroup'
self.username = None
self.password = None
+ self._ssh_key = None
self._test_executor = None
self._agent_count = 1
- self._agent_image = "Ubuntu 14.04"
- self._agent_flavor = "storperf"
+ self._agent_image = None
+ self._agent_flavor = None
self._availability_zone = None
self._public_network = None
self._volume_count = 1
self._volume_size = 1
self._volume_type = None
- self._cached_stack_id = None
- self._last_snaps_check_time = None
self._slave_addresses = []
- self._thread_pool = worker_pool(20)
self._filename = None
self._deadline = None
self._steady_state_samples = 10
@@ -83,6 +90,9 @@ class StorPerfMaster(object):
self._custom_workloads = []
self._subnet_CIDR = '172.16.0.0/16'
self.slave_info = {}
+ self.stackless = False
+ self.mkfs = None
+ self.mount_device = None
@property
def volume_count(self):
@@ -126,10 +136,14 @@ class StorPerfMaster(object):
@stack_name.setter
def stack_name(self, value):
- self._stack_name = value
- self.stack_settings.name = self.stack_name
- self.stack_id = None
- self._last_snaps_check_time = None
+ if value is None:
+ self.stackless = True
+ else:
+ self.stackless = False
+ self._stack_name = value
+ self.stack_settings.name = self.stack_name
+ self.stack_id = None
+ self._last_snaps_check_time = None
@property
def subnet_CIDR(self):
@@ -194,6 +208,10 @@ class StorPerfMaster(object):
def slave_addresses(self):
return self._slave_addresses
+ @slave_addresses.setter
+ def slave_addresses(self, value):
+ self._slave_addresses = value
+
@property
def stack_id(self):
self._get_stack_info()
@@ -204,6 +222,10 @@ class StorPerfMaster(object):
self._cached_stack_id = value
def _get_stack_info(self):
+ if self.stackless:
+ self._cached_stack_id = None
+ return None
+
if self._last_snaps_check_time is not None:
time_since_check = datetime.now() - self._last_snaps_check_time
if time_since_check.total_seconds() < 60:
@@ -216,7 +238,7 @@ class StorPerfMaster(object):
cinder_cli = cinder_utils.cinder_client(self.os_creds)
glance_cli = glance_utils.glance_client(self.os_creds)
- router_worker = self._thread_pool.apply_async(
+ router_worker = self._snaps_pool.apply_async(
self.heat_stack.get_router_creators)
vm_inst_creators = self.heat_stack.get_vm_inst_creators()
@@ -234,7 +256,7 @@ class StorPerfMaster(object):
server = vm1.get_vm_inst()
- image_worker = self._thread_pool.apply_async(
+ image_worker = self._snaps_pool.apply_async(
glance_utils.get_image_by_id, (glance_cli, server.image_id))
self._volume_count = len(server.volume_ids)
@@ -340,6 +362,19 @@ class StorPerfMaster(object):
self._custom_workloads = value
@property
+ def ssh_key(self):
+ if self._ssh_key is None:
+ return None
+ key = StringIO.StringIO(self._ssh_key)
+ pkey = paramiko.RSAKey.from_private_key(key)
+ key.close()
+ return pkey
+
+ @ssh_key.setter
+ def ssh_key(self, value):
+ self._ssh_key = value
+
+ @property
def is_stack_created(self):
return (self.stack_id is not None and
(self.heat_stack.get_status() == u'CREATE_COMPLETE' or
@@ -363,6 +398,8 @@ class StorPerfMaster(object):
return logs
def create_stack(self):
+ self.stackless = False
+
self.stack_settings.resource_files = [
'storperf/resources/hot/storperf-agent.yaml',
'storperf/resources/hot/storperf-volume.yaml']
@@ -422,7 +459,8 @@ class StorPerfMaster(object):
raise Exception("ERROR: Job {} is already running".format(
self._test_executor.job_id))
- if (self.stack_id is None):
+ if (not self.stackless and
+ self.stack_id is None):
raise ParameterError("ERROR: Stack %s does not exist" %
self.stack_name)
@@ -438,20 +476,23 @@ class StorPerfMaster(object):
slaves = self._slave_addresses
- setup_threads = []
+ setup_pool = ThreadPool(processes=len(slaves))
+ workers = []
for slave in slaves:
- t = Thread(target=self._setup_slave, args=(slave,))
- setup_threads.append(t)
- t.start()
+ worker = setup_pool.apply_async(
+ self._setup_slave, (slave,))
+ workers.append(worker)
- for thread in setup_threads:
- thread.join()
+ for worker in workers:
+ worker.get()
+
+ setup_pool.close()
self._test_executor.slaves = slaves
self._test_executor.volume_count = self.volume_count
params = metadata
- params['agent_count'] = self.agent_count
+ params['agent_count'] = len(slaves)
params['agent_flavor'] = self.agent_flavor
params['agent_image'] = self.agent_image
params['agent_info'] = json.dumps(self.slave_info)
@@ -466,9 +507,12 @@ class StorPerfMaster(object):
params['volume_count'] = self.volume_count
params['volume_size'] = self.volume_size
params['volume_type'] = self.volume_type
- if self.username and self.password:
+ if self.username:
params['username'] = self.username
+ if self.password:
params['password'] = self.password
+ if self.ssh_key:
+ params['ssh_key'] = self.ssh_key
job_id = self._test_executor.execute(params)
self.slave_info = {}
@@ -552,13 +596,23 @@ class StorPerfMaster(object):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.username and self.password:
- ssh.connect(slave,
- username=self.username,
- password=self.password)
+ ssh.connect(
+ slave,
+ username=self.username,
+ password=self.password,
+ timeout=2)
+ elif self.username and self.ssh_key:
+ ssh.connect(
+ slave,
+ username=self.username,
+ pkey=self.ssh_key,
+ timeout=2)
else:
- ssh.connect(slave, username='storperf',
- key_filename='storperf/resources/ssh/storperf_rsa',
- timeout=2)
+ ssh.connect(
+ slave,
+ username='storperf',
+ key_filename='storperf/resources/ssh/storperf_rsa',
+ timeout=2)
uname = self._get_uname(ssh)
logger.debug("Slave uname is %s" % uname)
@@ -582,6 +636,12 @@ class StorPerfMaster(object):
logger.debug("Transferring fio to %s" % slave)
scp.put('/usr/local/bin/fio', '~/')
+ if self.mkfs is not None:
+ self._mkfs(ssh, logger)
+
+ if self.mount_device is not None:
+ self._mount(ssh, logger)
+
def _get_uname(self, ssh):
(_, stdout, _) = ssh.exec_command("uname -a")
return stdout.readline()
@@ -594,6 +654,59 @@ class StorPerfMaster(object):
available = lines[3]
return int(available)
+ def _mkfs(self, ssh, logger):
+ command = "sudo umount %s" % (self.mount_device)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+
+ command = "sudo mkfs.%s %s" % (self.mkfs, self.mount_device)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ rc = stdout.channel.recv_exit_status()
+ stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ error_messages = ""
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+ error_messages += line.rstrip()
+
+ if rc != 0:
+ raise Exception(
+ "Error executing on {0}: {1}".format(
+ command, error_messages))
+
+ def _mount(self, ssh, logger):
+ command = "sudo mkdir -p %s" % (self.filename)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+
+ command = "sudo mount %s %s" % (self.mount_device, self.filename)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ rc = stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ error_messages = ""
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+ error_messages += line.rstrip()
+
+ if rc != 0:
+ raise Exception(
+ "Could not mount {0}: {1}".format(
+ self.mount_device, error_messages))
+
def _resize_root_fs(self, ssh, logger):
command = "sudo /usr/sbin/resize2fs /dev/vda1"
logger.info("Attempting %s" % command)
diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py
index 53832b4..4b5bbd4 100644
--- a/docker/storperf-master/storperf/test_executor.py
+++ b/docker/storperf-master/storperf/test_executor.py
@@ -217,18 +217,19 @@ class TestExecutor(object):
def execute(self, metadata):
self.job_db.create_job_id()
+ self._setup_metadata(metadata)
try:
self.test_params()
except Exception as e:
self.terminate()
raise e
- self._setup_metadata(metadata)
- self.job_db.record_workload_params(metadata)
+ stripped_metadata = metadata.copy()
+ stripped_metadata.pop('ssh_key', None)
+ self.job_db.record_workload_params(stripped_metadata)
self._workload_thread = Thread(target=self.execute_workloads,
args=(),
name="Workload thread")
self._workload_thread.start()
- # seems to be hanging here
return self.job_db.job_id
def terminate(self):
@@ -362,12 +363,17 @@ class TestExecutor(object):
if self._custom_workloads:
for workload_name in self._custom_workloads.iterkeys():
- if not workload_name.isalnum():
+ real_name = workload_name
+ if real_name.startswith('_'):
+ real_name = real_name.replace('_', '')
+ self.logger.info("--- real_name: %s" % real_name)
+
+ if not real_name.isalnum():
raise InvalidWorkloadName(
"Workload name must be alphanumeric only: %s" %
- workload_name)
+ real_name)
workload = _custom_workload()
- workload.options['name'] = workload_name
+ workload.options['name'] = real_name
workload.name = workload_name
if (self.filename is not None):
workload.filename = self.filename
diff --git a/docker/storperf-master/storperf/utilities/data_handler.py b/docker/storperf-master/storperf/utilities/data_handler.py
index 6e87781..98ae640 100644
--- a/docker/storperf-master/storperf/utilities/data_handler.py
+++ b/docker/storperf-master/storperf/utilities/data_handler.py
@@ -157,9 +157,11 @@ class DataHandler(object):
test_db = os.environ.get('TEST_DB_URL')
if test_db is not None:
self.logger.info("Pushing results to %s" % (test_db))
+ stripped_metadata = executor.metadata
+ stripped_metadata.pop("ssh_key", None)
response = test_results_db.push_results_to_db(
test_db,
- executor.metadata,
+ stripped_metadata,
self.logger)
if response:
self.logger.info("Results reference: %s" % response['href'])
diff --git a/docker/storperf-master/storperf/workloads/_base_workload.py b/docker/storperf-master/storperf/workloads/_base_workload.py
index 9b04314..7468fea 100644
--- a/docker/storperf-master/storperf/workloads/_base_workload.py
+++ b/docker/storperf-master/storperf/workloads/_base_workload.py
@@ -44,17 +44,24 @@ class _base_workload(object):
self.options['size'] = "100%"
self.logger.debug(
"Profiling a device, using 100% of " + self.filename)
+ self.options['filename'] = self.filename
else:
- self.options['size'] = self.default_filesize
+ if 'size' not in self.options:
+ self.options['size'] = self.default_filesize
self.logger.debug("Profiling a filesystem, using " +
- self.default_filesize + " file")
-
- self.options['filename'] = self.filename
+ self.options['size'] + " file")
+ if not self.filename.endswith('/'):
+ self.filename = self.filename + "/"
+ self.options['directory'] = self.filename
+ self.options['filename_format'] = "'storperf.$jobnum.$filenum'"
self.setup()
for key, value in self.options.iteritems():
- args.append('--' + key + "=" + value)
+ if value is not None:
+ args.append('--' + key + "=" + str(value))
+ else:
+ args.append('--' + key)
if parse_only:
args.append('--parse-only')
diff --git a/docker/storperf-master/storperf/workloads/_custom_workload.py b/docker/storperf-master/storperf/workloads/_custom_workload.py
index 9e0100d..5cd37b3 100644
--- a/docker/storperf-master/storperf/workloads/_custom_workload.py
+++ b/docker/storperf-master/storperf/workloads/_custom_workload.py
@@ -18,12 +18,12 @@ class _custom_workload(_base_workload._base_workload):
self.default_filesize = "1G"
self.filename = '/dev/vdb'
self.fixed_options = {
- 'loops': '200',
'output-format': 'json',
'status-interval': '60'
}
self.options = {
'ioengine': 'libaio',
+ 'loops': '200',
'direct': '1',
'numjobs': '1',
'rw': 'read',