summaryrefslogtreecommitdiffstats
path: root/docker
diff options
context:
space:
mode:
authormbeierl <mark.beierl@dell.com>2018-10-24 17:05:37 -0400
committermbeierl <mark.beierl@dell.com>2018-10-29 11:02:00 -0400
commitbf29ec6e9a5f742d71e7d5cafe009b7223f46782 (patch)
treebd9cee239de2c4aa4982c27864807385fd4cd61f /docker
parent518988ca97031ef0e64e15db5ec2d8f2b86d49e9 (diff)
Add Stackless Support
Adds ability to specify IP address list instead of looking up full list from OpenStack Heat. Adds stackless mode to bypass OpenStack heat altogether if we are running against bare metal or other nodes that are not under Heat's domain. Adds ability to create filesystems and mount them for profiling. Adds number of jobs and number of files to create to the initializations API so we can lay down files and fill them with random data ahead of the actual performance run. Change-Id: Ia787f8b863bc92b38dd29b3cf17eda0d48f3bcd5 JIRA: STORPERF-265 Signed-off-by: mbeierl <mark.beierl@dell.com>
Diffstat (limited to 'docker')
-rw-r--r--docker/storperf-master/rest_server.py114
-rw-r--r--docker/storperf-master/storperf/fio/fio_invoker.py5
-rw-r--r--docker/storperf-master/storperf/storperf_master.py175
-rw-r--r--docker/storperf-master/storperf/test_executor.py18
-rw-r--r--docker/storperf-master/storperf/utilities/data_handler.py4
-rw-r--r--docker/storperf-master/storperf/workloads/_base_workload.py17
-rw-r--r--docker/storperf-master/storperf/workloads/_custom_workload.py2
7 files changed, 279 insertions, 56 deletions
diff --git a/docker/storperf-master/rest_server.py b/docker/storperf-master/rest_server.py
index 92b6c85..ce3a41c 100644
--- a/docker/storperf-master/rest_server.py
+++ b/docker/storperf-master/rest_server.py
@@ -379,11 +379,13 @@ the last stack named.
if not request.json:
abort(400, "ERROR: Missing configuration data")
+ storperf.reset_values()
self.logger.info(request.json)
try:
if ('stack_name' in request.json):
storperf.stack_name = request.json['stack_name']
+ storperf.stackless = False
if ('target' in request.json):
storperf.filename = request.json['target']
if ('deadline' in request.json):
@@ -439,7 +441,7 @@ class WorkloadsBodyModel:
@swagger.model
@swagger.nested(
- name=WorkloadsBodyModel.__name__)
+ name=WorkloadsBodyModel.__name__)
class WorkloadsNameModel:
resource_fields = {
"name": fields.Nested(WorkloadsBodyModel.resource_fields)
@@ -448,7 +450,7 @@ class WorkloadsNameModel:
@swagger.model
@swagger.nested(
- workloads=WorkloadsNameModel.__name__)
+ workloads=WorkloadsNameModel.__name__)
class WorkloadV2Model:
resource_fields = {
'target': fields.String,
@@ -457,7 +459,11 @@ class WorkloadV2Model:
'workloads': fields.Nested(WorkloadsNameModel.resource_fields),
'queue_depths': fields.String,
'block_sizes': fields.String,
- 'stack_name': fields.String
+ 'stack_name': fields.String,
+ 'username': fields.String,
+ 'password': fields.String,
+ 'ssh_private_key': fields.String,
+ 'slave_addresses': fields.List
}
required = ['workloads']
@@ -484,7 +490,19 @@ for any single test iteration.
"workloads": A JSON formatted map of workload names and parameters for FIO.
"stack_name": The target stack to use. Defaults to StorPerfAgentGroup, or
-the last stack named.
+the last stack named. Explicitly specifying null will bypass all Heat Stack
+operations and go directly against the IP addresses specified.
+
+"username": if specified, the username to use when logging into the slave.
+
+"password": if specified, the password to use when logging into the slave.
+
+"ssh_private_key": if specified, the ssh private key to use when logging
+into the slave.
+
+"slave_addresses": if specified, a list of IP addresses to use instead of
+looking all of them up from the stack.
+
""",
"required": True,
"type": "WorkloadV2Model",
@@ -505,9 +523,10 @@ the last stack named.
)
def post(self):
if not request.json:
- abort(400, "ERROR: Missing configuration data")
+ abort(400, "ERROR: Missing job data")
self.logger.info(request.json)
+ storperf.reset_values()
try:
if ('stack_name' in request.json):
@@ -534,6 +553,15 @@ the last stack named.
else:
metadata = {}
+ if 'username' in request.json:
+ storperf.username = request.json['username']
+ if 'password' in request.json:
+ storperf.password = request.json['password']
+ if 'ssh_private_key' in request.json:
+ storperf.ssh_key = request.json['ssh_private_key']
+ if 'slave_addresses' in request.json:
+ storperf.slave_addresses = request.json['slave_addresses']
+
job_id = storperf.execute_workloads(metadata)
return jsonify({'job_id': job_id})
@@ -547,7 +575,15 @@ the last stack named.
class WarmUpModel:
resource_fields = {
'stack_name': fields.String,
- 'target': fields.String
+ 'target': fields.String,
+ 'username': fields.String,
+ 'password': fields.String,
+ 'ssh_private_key': fields.String,
+ 'slave_addresses': fields.List,
+ 'mkfs': fields.String,
+ 'mount_point': fields.String,
+ 'file_size': fields.String,
+ 'file_count': fields.String
}
@@ -565,10 +601,35 @@ class Initialize(Resource):
"description": """Fill the target with random data. If no
target is specified, it will default to /dev/vdb
-"target": The target device or file to fill with random data.
+"target": The target device to use.
"stack_name": The target stack to use. Defaults to StorPerfAgentGroup, or
-the last stack named.
+the last stack named. Explicitly specifying null will bypass all Heat Stack
+operations and go directly against the IP addresses specified.
+
+"username": if specified, the username to use when logging into the slave.
+
+"password": if specified, the password to use when logging into the slave.
+
+"ssh_private_key": if specified, the ssh private key to use when logging
+into the slave.
+
+"slave_addresses": if specified, a list of IP addresses to use instead of
+looking all of them up from the stack.
+
+"mkfs": if specified, the command to execute in order to create a filesystem
+on the target device (eg: mkfs.ext4)
+
+"mount_point": if specified, the directory to use when mounting the device.
+
+"filesize": if specified, the size of the files to create when profiling
+a filesystem.
+
+"nrfiles": if specified, the number of files to create when profiling
+a filesystem
+
+"numjobs": if specified, the number of jobs for when profiling
+a filesystem
""",
"required": False,
"type": "WarmUpModel",
@@ -593,17 +654,46 @@ the last stack named.
)
def post(self):
self.logger.info(request.json)
+ storperf.reset_values()
try:
+ warm_up_args = {
+ 'rw': 'randwrite',
+ 'direct': "1",
+ 'loops': "1"
+ }
+ storperf.queue_depths = "8"
+ storperf.block_sizes = "16k"
+
if request.json:
if 'target' in request.json:
storperf.filename = request.json['target']
if 'stack_name' in request.json:
storperf.stack_name = request.json['stack_name']
- storperf.queue_depths = "8"
- storperf.block_sizes = "16k"
- storperf.workloads = "_warm_up"
- storperf.custom_workloads = None
+ if 'username' in request.json:
+ storperf.username = request.json['username']
+ if 'password' in request.json:
+ storperf.password = request.json['password']
+ if 'ssh_private_key' in request.json:
+ storperf.ssh_key = request.json['ssh_private_key']
+ if 'slave_addresses' in request.json:
+ storperf.slave_addresses = request.json['slave_addresses']
+ if 'mkfs' in request.json:
+ storperf.mkfs = request.json['mkfs']
+ if 'mount_device' in request.json:
+ storperf.mount_device = request.json['mount_device']
+ if 'filesize' in request.json:
+ warm_up_args['filesize'] = str(request.json['filesize'])
+ if 'nrfiles' in request.json:
+ warm_up_args['nrfiles'] = str(request.json['nrfiles'])
+ if 'numjobs' in request.json:
+ warm_up_args['numjobs'] = str(request.json['numjobs'])
+
+ storperf.workloads = None
+ storperf.custom_workloads = {
+ '_warm_up': warm_up_args
+ }
+ self.logger.info(storperf.custom_workloads)
job_id = storperf.execute_workloads()
return jsonify({'job_id': job_id})
diff --git a/docker/storperf-master/storperf/fio/fio_invoker.py b/docker/storperf-master/storperf/fio/fio_invoker.py
index a361eec..c665598 100644
--- a/docker/storperf-master/storperf/fio/fio_invoker.py
+++ b/docker/storperf-master/storperf/fio/fio_invoker.py
@@ -158,6 +158,11 @@ class FIOInvoker(object):
username=self.metadata['username'],
password=self.metadata['password'])
return ssh
+ elif 'username' in self.metadata and 'ssh_key' in self.metadata:
+ ssh.connect(self.remote_host,
+ username=self.metadata['username'],
+ pkey=self.metadata['ssh_key'])
+ return ssh
else:
ssh.connect(self.remote_host, username='storperf',
key_filename='storperf/resources/ssh/storperf_rsa',
diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py
index 0c7e559..76c4807 100644
--- a/docker/storperf-master/storperf/storperf_master.py
+++ b/docker/storperf-master/storperf/storperf_master.py
@@ -7,25 +7,26 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+import StringIO
from datetime import datetime
+import json
import logging
+from multiprocessing.pool import ThreadPool
import os
import socket
-from threading import Thread
from time import sleep
+import uuid
import paramiko
from scp import SCPClient
-
from snaps.config.stack import StackConfig
from snaps.openstack.create_stack import OpenStackHeatStack
from snaps.openstack.os_credentials import OSCreds
from snaps.openstack.utils import heat_utils, cinder_utils, glance_utils
from snaps.thread_utils import worker_pool
+
from storperf.db.job_db import JobDB
from storperf.test_executor import TestExecutor
-import json
-import uuid
class ParameterError(Exception):
@@ -37,8 +38,9 @@ class StorPerfMaster(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
+ self.reset_values()
+
self.job_db = JobDB()
- self._stack_name = 'StorPerfAgentGroup'
self.stack_settings = StackConfig(
name=self.stack_name,
template_path='storperf/resources/hot/agent-group.yaml')
@@ -59,21 +61,26 @@ class StorPerfMaster(object):
self.heat_stack = OpenStackHeatStack(self.os_creds,
self.stack_settings)
+
+ self._cached_stack_id = None
+ self._last_snaps_check_time = None
+ self._snaps_pool = worker_pool(20)
+
+ def reset_values(self):
+ self._stack_name = 'StorPerfAgentGroup'
self.username = None
self.password = None
+ self._ssh_key = None
self._test_executor = None
self._agent_count = 1
- self._agent_image = "Ubuntu 14.04"
- self._agent_flavor = "storperf"
+ self._agent_image = None
+ self._agent_flavor = None
self._availability_zone = None
self._public_network = None
self._volume_count = 1
self._volume_size = 1
self._volume_type = None
- self._cached_stack_id = None
- self._last_snaps_check_time = None
self._slave_addresses = []
- self._thread_pool = worker_pool(20)
self._filename = None
self._deadline = None
self._steady_state_samples = 10
@@ -83,6 +90,9 @@ class StorPerfMaster(object):
self._custom_workloads = []
self._subnet_CIDR = '172.16.0.0/16'
self.slave_info = {}
+ self.stackless = False
+ self.mkfs = None
+ self.mount_device = None
@property
def volume_count(self):
@@ -126,10 +136,14 @@ class StorPerfMaster(object):
@stack_name.setter
def stack_name(self, value):
- self._stack_name = value
- self.stack_settings.name = self.stack_name
- self.stack_id = None
- self._last_snaps_check_time = None
+ if value is None:
+ self.stackless = True
+ else:
+ self.stackless = False
+ self._stack_name = value
+ self.stack_settings.name = self.stack_name
+ self.stack_id = None
+ self._last_snaps_check_time = None
@property
def subnet_CIDR(self):
@@ -194,6 +208,10 @@ class StorPerfMaster(object):
def slave_addresses(self):
return self._slave_addresses
+ @slave_addresses.setter
+ def slave_addresses(self, value):
+ self._slave_addresses = value
+
@property
def stack_id(self):
self._get_stack_info()
@@ -204,6 +222,10 @@ class StorPerfMaster(object):
self._cached_stack_id = value
def _get_stack_info(self):
+ if self.stackless:
+ self._cached_stack_id = None
+ return None
+
if self._last_snaps_check_time is not None:
time_since_check = datetime.now() - self._last_snaps_check_time
if time_since_check.total_seconds() < 60:
@@ -216,7 +238,7 @@ class StorPerfMaster(object):
cinder_cli = cinder_utils.cinder_client(self.os_creds)
glance_cli = glance_utils.glance_client(self.os_creds)
- router_worker = self._thread_pool.apply_async(
+ router_worker = self._snaps_pool.apply_async(
self.heat_stack.get_router_creators)
vm_inst_creators = self.heat_stack.get_vm_inst_creators()
@@ -234,7 +256,7 @@ class StorPerfMaster(object):
server = vm1.get_vm_inst()
- image_worker = self._thread_pool.apply_async(
+ image_worker = self._snaps_pool.apply_async(
glance_utils.get_image_by_id, (glance_cli, server.image_id))
self._volume_count = len(server.volume_ids)
@@ -340,6 +362,19 @@ class StorPerfMaster(object):
self._custom_workloads = value
@property
+ def ssh_key(self):
+ if self._ssh_key is None:
+ return None
+ key = StringIO.StringIO(self._ssh_key)
+ pkey = paramiko.RSAKey.from_private_key(key)
+ key.close()
+ return pkey
+
+ @ssh_key.setter
+ def ssh_key(self, value):
+ self._ssh_key = value
+
+ @property
def is_stack_created(self):
return (self.stack_id is not None and
(self.heat_stack.get_status() == u'CREATE_COMPLETE' or
@@ -363,6 +398,8 @@ class StorPerfMaster(object):
return logs
def create_stack(self):
+ self.stackless = False
+
self.stack_settings.resource_files = [
'storperf/resources/hot/storperf-agent.yaml',
'storperf/resources/hot/storperf-volume.yaml']
@@ -422,7 +459,8 @@ class StorPerfMaster(object):
raise Exception("ERROR: Job {} is already running".format(
self._test_executor.job_id))
- if (self.stack_id is None):
+ if (not self.stackless and
+ self.stack_id is None):
raise ParameterError("ERROR: Stack %s does not exist" %
self.stack_name)
@@ -438,20 +476,23 @@ class StorPerfMaster(object):
slaves = self._slave_addresses
- setup_threads = []
+ setup_pool = ThreadPool(processes=len(slaves))
+ workers = []
for slave in slaves:
- t = Thread(target=self._setup_slave, args=(slave,))
- setup_threads.append(t)
- t.start()
+ worker = setup_pool.apply_async(
+ self._setup_slave, (slave,))
+ workers.append(worker)
- for thread in setup_threads:
- thread.join()
+ for worker in workers:
+ worker.get()
+
+ setup_pool.close()
self._test_executor.slaves = slaves
self._test_executor.volume_count = self.volume_count
params = metadata
- params['agent_count'] = self.agent_count
+ params['agent_count'] = len(slaves)
params['agent_flavor'] = self.agent_flavor
params['agent_image'] = self.agent_image
params['agent_info'] = json.dumps(self.slave_info)
@@ -466,9 +507,12 @@ class StorPerfMaster(object):
params['volume_count'] = self.volume_count
params['volume_size'] = self.volume_size
params['volume_type'] = self.volume_type
- if self.username and self.password:
+ if self.username:
params['username'] = self.username
+ if self.password:
params['password'] = self.password
+ if self.ssh_key:
+ params['ssh_key'] = self.ssh_key
job_id = self._test_executor.execute(params)
self.slave_info = {}
@@ -552,13 +596,23 @@ class StorPerfMaster(object):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.username and self.password:
- ssh.connect(slave,
- username=self.username,
- password=self.password)
+ ssh.connect(
+ slave,
+ username=self.username,
+ password=self.password,
+ timeout=2)
+ elif self.username and self.ssh_key:
+ ssh.connect(
+ slave,
+ username=self.username,
+ pkey=self.ssh_key,
+ timeout=2)
else:
- ssh.connect(slave, username='storperf',
- key_filename='storperf/resources/ssh/storperf_rsa',
- timeout=2)
+ ssh.connect(
+ slave,
+ username='storperf',
+ key_filename='storperf/resources/ssh/storperf_rsa',
+ timeout=2)
uname = self._get_uname(ssh)
logger.debug("Slave uname is %s" % uname)
@@ -582,6 +636,12 @@ class StorPerfMaster(object):
logger.debug("Transferring fio to %s" % slave)
scp.put('/usr/local/bin/fio', '~/')
+ if self.mkfs is not None:
+ self._mkfs(ssh, logger)
+
+ if self.mount_device is not None:
+ self._mount(ssh, logger)
+
def _get_uname(self, ssh):
(_, stdout, _) = ssh.exec_command("uname -a")
return stdout.readline()
@@ -594,6 +654,59 @@ class StorPerfMaster(object):
available = lines[3]
return int(available)
+ def _mkfs(self, ssh, logger):
+ command = "sudo umount %s" % (self.mount_device)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+
+ command = "sudo mkfs.%s %s" % (self.mkfs, self.mount_device)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ rc = stdout.channel.recv_exit_status()
+ stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ error_messages = ""
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+ error_messages += line.rstrip()
+
+ if rc != 0:
+ raise Exception(
+ "Error executing on {0}: {1}".format(
+ command, error_messages))
+
+ def _mount(self, ssh, logger):
+ command = "sudo mkdir -p %s" % (self.filename)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+
+ command = "sudo mount %s %s" % (self.mount_device, self.filename)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ rc = stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ error_messages = ""
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+ error_messages += line.rstrip()
+
+ if rc != 0:
+ raise Exception(
+ "Could not mount {0}: {1}".format(
+ self.mount_device, error_messages))
+
def _resize_root_fs(self, ssh, logger):
command = "sudo /usr/sbin/resize2fs /dev/vda1"
logger.info("Attempting %s" % command)
diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py
index 53832b4..4b5bbd4 100644
--- a/docker/storperf-master/storperf/test_executor.py
+++ b/docker/storperf-master/storperf/test_executor.py
@@ -217,18 +217,19 @@ class TestExecutor(object):
def execute(self, metadata):
self.job_db.create_job_id()
+ self._setup_metadata(metadata)
try:
self.test_params()
except Exception as e:
self.terminate()
raise e
- self._setup_metadata(metadata)
- self.job_db.record_workload_params(metadata)
+ stripped_metadata = metadata.copy()
+ stripped_metadata.pop('ssh_key', None)
+ self.job_db.record_workload_params(stripped_metadata)
self._workload_thread = Thread(target=self.execute_workloads,
args=(),
name="Workload thread")
self._workload_thread.start()
- # seems to be hanging here
return self.job_db.job_id
def terminate(self):
@@ -362,12 +363,17 @@ class TestExecutor(object):
if self._custom_workloads:
for workload_name in self._custom_workloads.iterkeys():
- if not workload_name.isalnum():
+ real_name = workload_name
+ if real_name.startswith('_'):
+ real_name = real_name.replace('_', '')
+ self.logger.info("--- real_name: %s" % real_name)
+
+ if not real_name.isalnum():
raise InvalidWorkloadName(
"Workload name must be alphanumeric only: %s" %
- workload_name)
+ real_name)
workload = _custom_workload()
- workload.options['name'] = workload_name
+ workload.options['name'] = real_name
workload.name = workload_name
if (self.filename is not None):
workload.filename = self.filename
diff --git a/docker/storperf-master/storperf/utilities/data_handler.py b/docker/storperf-master/storperf/utilities/data_handler.py
index 6e87781..98ae640 100644
--- a/docker/storperf-master/storperf/utilities/data_handler.py
+++ b/docker/storperf-master/storperf/utilities/data_handler.py
@@ -157,9 +157,11 @@ class DataHandler(object):
test_db = os.environ.get('TEST_DB_URL')
if test_db is not None:
self.logger.info("Pushing results to %s" % (test_db))
+ stripped_metadata = executor.metadata
+ stripped_metadata.pop("ssh_key", None)
response = test_results_db.push_results_to_db(
test_db,
- executor.metadata,
+ stripped_metadata,
self.logger)
if response:
self.logger.info("Results reference: %s" % response['href'])
diff --git a/docker/storperf-master/storperf/workloads/_base_workload.py b/docker/storperf-master/storperf/workloads/_base_workload.py
index 9b04314..7468fea 100644
--- a/docker/storperf-master/storperf/workloads/_base_workload.py
+++ b/docker/storperf-master/storperf/workloads/_base_workload.py
@@ -44,17 +44,24 @@ class _base_workload(object):
self.options['size'] = "100%"
self.logger.debug(
"Profiling a device, using 100% of " + self.filename)
+ self.options['filename'] = self.filename
else:
- self.options['size'] = self.default_filesize
+ if 'size' not in self.options:
+ self.options['size'] = self.default_filesize
self.logger.debug("Profiling a filesystem, using " +
- self.default_filesize + " file")
-
- self.options['filename'] = self.filename
+ self.options['size'] + " file")
+ if not self.filename.endswith('/'):
+ self.filename = self.filename + "/"
+ self.options['directory'] = self.filename
+ self.options['filename_format'] = "'storperf.$jobnum.$filenum'"
self.setup()
for key, value in self.options.iteritems():
- args.append('--' + key + "=" + value)
+ if value is not None:
+ args.append('--' + key + "=" + str(value))
+ else:
+ args.append('--' + key)
if parse_only:
args.append('--parse-only')
diff --git a/docker/storperf-master/storperf/workloads/_custom_workload.py b/docker/storperf-master/storperf/workloads/_custom_workload.py
index 9e0100d..5cd37b3 100644
--- a/docker/storperf-master/storperf/workloads/_custom_workload.py
+++ b/docker/storperf-master/storperf/workloads/_custom_workload.py
@@ -18,12 +18,12 @@ class _custom_workload(_base_workload._base_workload):
self.default_filesize = "1G"
self.filename = '/dev/vdb'
self.fixed_options = {
- 'loops': '200',
'output-format': 'json',
'status-interval': '60'
}
self.options = {
'ioengine': 'libaio',
+ 'loops': '200',
'direct': '1',
'numjobs': '1',
'rw': 'read',