From bf29ec6e9a5f742d71e7d5cafe009b7223f46782 Mon Sep 17 00:00:00 2001 From: mbeierl Date: Wed, 24 Oct 2018 17:05:37 -0400 Subject: Add Stackless Support Adds ability to specify IP address list instead of looking up full list from OpenStack Heat. Adds stackless mode to bypass OpenStack heat altogether if we are running against bare metal or other nodes that are not under Heat's domain. Adds ability to create filesystems and mount them for profiling. Adds number of jobs and number of files to create to the initializations API so we can lay down files and fill them with random data ahead of the actual performance run. Change-Id: Ia787f8b863bc92b38dd29b3cf17eda0d48f3bcd5 JIRA: STORPERF-265 Signed-off-by: mbeierl --- docker/storperf-master/rest_server.py | 114 ++++++++++++-- docker/storperf-master/storperf/fio/fio_invoker.py | 5 + docker/storperf-master/storperf/storperf_master.py | 175 +++++++++++++++++---- docker/storperf-master/storperf/test_executor.py | 18 ++- .../storperf/utilities/data_handler.py | 4 +- .../storperf/workloads/_base_workload.py | 17 +- .../storperf/workloads/_custom_workload.py | 2 +- 7 files changed, 279 insertions(+), 56 deletions(-) (limited to 'docker') diff --git a/docker/storperf-master/rest_server.py b/docker/storperf-master/rest_server.py index 92b6c85..ce3a41c 100644 --- a/docker/storperf-master/rest_server.py +++ b/docker/storperf-master/rest_server.py @@ -379,11 +379,13 @@ the last stack named. if not request.json: abort(400, "ERROR: Missing configuration data") + storperf.reset_values() self.logger.info(request.json) try: if ('stack_name' in request.json): storperf.stack_name = request.json['stack_name'] + storperf.stackless = False if ('target' in request.json): storperf.filename = request.json['target'] if ('deadline' in request.json): @@ -439,7 +441,7 @@ class WorkloadsBodyModel: @swagger.model @swagger.nested( - name=WorkloadsBodyModel.__name__) + name=WorkloadsBodyModel.__name__) class WorkloadsNameModel: resource_fields = { "name": fields.Nested(WorkloadsBodyModel.resource_fields) @@ -448,7 +450,7 @@ class WorkloadsNameModel: @swagger.model @swagger.nested( - workloads=WorkloadsNameModel.__name__) + workloads=WorkloadsNameModel.__name__) class WorkloadV2Model: resource_fields = { 'target': fields.String, @@ -457,7 +459,11 @@ class WorkloadV2Model: 'workloads': fields.Nested(WorkloadsNameModel.resource_fields), 'queue_depths': fields.String, 'block_sizes': fields.String, - 'stack_name': fields.String + 'stack_name': fields.String, + 'username': fields.String, + 'password': fields.String, + 'ssh_private_key': fields.String, + 'slave_addresses': fields.List } required = ['workloads'] @@ -484,7 +490,19 @@ for any single test iteration. "workloads": A JSON formatted map of workload names and parameters for FIO. "stack_name": The target stack to use. Defaults to StorPerfAgentGroup, or -the last stack named. +the last stack named. Explicitly specifying null will bypass all Heat Stack +operations and go directly against the IP addresses specified. + +"username": if specified, the username to use when logging into the slave. + +"password": if specified, the password to use when logging into the slave. + +"ssh_private_key": if specified, the ssh private key to use when logging +into the slave. + +"slave_addresses": if specified, a list of IP addresses to use instead of +looking all of them up from the stack. + """, "required": True, "type": "WorkloadV2Model", @@ -505,9 +523,10 @@ the last stack named. ) def post(self): if not request.json: - abort(400, "ERROR: Missing configuration data") + abort(400, "ERROR: Missing job data") self.logger.info(request.json) + storperf.reset_values() try: if ('stack_name' in request.json): @@ -534,6 +553,15 @@ the last stack named. else: metadata = {} + if 'username' in request.json: + storperf.username = request.json['username'] + if 'password' in request.json: + storperf.password = request.json['password'] + if 'ssh_private_key' in request.json: + storperf.ssh_key = request.json['ssh_private_key'] + if 'slave_addresses' in request.json: + storperf.slave_addresses = request.json['slave_addresses'] + job_id = storperf.execute_workloads(metadata) return jsonify({'job_id': job_id}) @@ -547,7 +575,15 @@ the last stack named. class WarmUpModel: resource_fields = { 'stack_name': fields.String, - 'target': fields.String + 'target': fields.String, + 'username': fields.String, + 'password': fields.String, + 'ssh_private_key': fields.String, + 'slave_addresses': fields.List, + 'mkfs': fields.String, + 'mount_point': fields.String, + 'file_size': fields.String, + 'file_count': fields.String } @@ -565,10 +601,35 @@ class Initialize(Resource): "description": """Fill the target with random data. If no target is specified, it will default to /dev/vdb -"target": The target device or file to fill with random data. +"target": The target device to use. "stack_name": The target stack to use. Defaults to StorPerfAgentGroup, or -the last stack named. +the last stack named. Explicitly specifying null will bypass all Heat Stack +operations and go directly against the IP addresses specified. + +"username": if specified, the username to use when logging into the slave. + +"password": if specified, the password to use when logging into the slave. + +"ssh_private_key": if specified, the ssh private key to use when logging +into the slave. + +"slave_addresses": if specified, a list of IP addresses to use instead of +looking all of them up from the stack. + +"mkfs": if specified, the command to execute in order to create a filesystem +on the target device (eg: mkfs.ext4) + +"mount_point": if specified, the directory to use when mounting the device. + +"filesize": if specified, the size of the files to create when profiling +a filesystem. + +"nrfiles": if specified, the number of files to create when profiling +a filesystem + +"numjobs": if specified, the number of jobs for when profiling +a filesystem """, "required": False, "type": "WarmUpModel", @@ -593,17 +654,46 @@ the last stack named. ) def post(self): self.logger.info(request.json) + storperf.reset_values() try: + warm_up_args = { + 'rw': 'randwrite', + 'direct': "1", + 'loops': "1" + } + storperf.queue_depths = "8" + storperf.block_sizes = "16k" + if request.json: if 'target' in request.json: storperf.filename = request.json['target'] if 'stack_name' in request.json: storperf.stack_name = request.json['stack_name'] - storperf.queue_depths = "8" - storperf.block_sizes = "16k" - storperf.workloads = "_warm_up" - storperf.custom_workloads = None + if 'username' in request.json: + storperf.username = request.json['username'] + if 'password' in request.json: + storperf.password = request.json['password'] + if 'ssh_private_key' in request.json: + storperf.ssh_key = request.json['ssh_private_key'] + if 'slave_addresses' in request.json: + storperf.slave_addresses = request.json['slave_addresses'] + if 'mkfs' in request.json: + storperf.mkfs = request.json['mkfs'] + if 'mount_device' in request.json: + storperf.mount_device = request.json['mount_device'] + if 'filesize' in request.json: + warm_up_args['filesize'] = str(request.json['filesize']) + if 'nrfiles' in request.json: + warm_up_args['nrfiles'] = str(request.json['nrfiles']) + if 'numjobs' in request.json: + warm_up_args['numjobs'] = str(request.json['numjobs']) + + storperf.workloads = None + storperf.custom_workloads = { + '_warm_up': warm_up_args + } + self.logger.info(storperf.custom_workloads) job_id = storperf.execute_workloads() return jsonify({'job_id': job_id}) diff --git a/docker/storperf-master/storperf/fio/fio_invoker.py b/docker/storperf-master/storperf/fio/fio_invoker.py index a361eec..c665598 100644 --- a/docker/storperf-master/storperf/fio/fio_invoker.py +++ b/docker/storperf-master/storperf/fio/fio_invoker.py @@ -158,6 +158,11 @@ class FIOInvoker(object): username=self.metadata['username'], password=self.metadata['password']) return ssh + elif 'username' in self.metadata and 'ssh_key' in self.metadata: + ssh.connect(self.remote_host, + username=self.metadata['username'], + pkey=self.metadata['ssh_key']) + return ssh else: ssh.connect(self.remote_host, username='storperf', key_filename='storperf/resources/ssh/storperf_rsa', diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py index 0c7e559..76c4807 100644 --- a/docker/storperf-master/storperf/storperf_master.py +++ b/docker/storperf-master/storperf/storperf_master.py @@ -7,25 +7,26 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +import StringIO from datetime import datetime +import json import logging +from multiprocessing.pool import ThreadPool import os import socket -from threading import Thread from time import sleep +import uuid import paramiko from scp import SCPClient - from snaps.config.stack import StackConfig from snaps.openstack.create_stack import OpenStackHeatStack from snaps.openstack.os_credentials import OSCreds from snaps.openstack.utils import heat_utils, cinder_utils, glance_utils from snaps.thread_utils import worker_pool + from storperf.db.job_db import JobDB from storperf.test_executor import TestExecutor -import json -import uuid class ParameterError(Exception): @@ -37,8 +38,9 @@ class StorPerfMaster(object): def __init__(self): self.logger = logging.getLogger(__name__) + self.reset_values() + self.job_db = JobDB() - self._stack_name = 'StorPerfAgentGroup' self.stack_settings = StackConfig( name=self.stack_name, template_path='storperf/resources/hot/agent-group.yaml') @@ -59,21 +61,26 @@ class StorPerfMaster(object): self.heat_stack = OpenStackHeatStack(self.os_creds, self.stack_settings) + + self._cached_stack_id = None + self._last_snaps_check_time = None + self._snaps_pool = worker_pool(20) + + def reset_values(self): + self._stack_name = 'StorPerfAgentGroup' self.username = None self.password = None + self._ssh_key = None self._test_executor = None self._agent_count = 1 - self._agent_image = "Ubuntu 14.04" - self._agent_flavor = "storperf" + self._agent_image = None + self._agent_flavor = None self._availability_zone = None self._public_network = None self._volume_count = 1 self._volume_size = 1 self._volume_type = None - self._cached_stack_id = None - self._last_snaps_check_time = None self._slave_addresses = [] - self._thread_pool = worker_pool(20) self._filename = None self._deadline = None self._steady_state_samples = 10 @@ -83,6 +90,9 @@ class StorPerfMaster(object): self._custom_workloads = [] self._subnet_CIDR = '172.16.0.0/16' self.slave_info = {} + self.stackless = False + self.mkfs = None + self.mount_device = None @property def volume_count(self): @@ -126,10 +136,14 @@ class StorPerfMaster(object): @stack_name.setter def stack_name(self, value): - self._stack_name = value - self.stack_settings.name = self.stack_name - self.stack_id = None - self._last_snaps_check_time = None + if value is None: + self.stackless = True + else: + self.stackless = False + self._stack_name = value + self.stack_settings.name = self.stack_name + self.stack_id = None + self._last_snaps_check_time = None @property def subnet_CIDR(self): @@ -194,6 +208,10 @@ class StorPerfMaster(object): def slave_addresses(self): return self._slave_addresses + @slave_addresses.setter + def slave_addresses(self, value): + self._slave_addresses = value + @property def stack_id(self): self._get_stack_info() @@ -204,6 +222,10 @@ class StorPerfMaster(object): self._cached_stack_id = value def _get_stack_info(self): + if self.stackless: + self._cached_stack_id = None + return None + if self._last_snaps_check_time is not None: time_since_check = datetime.now() - self._last_snaps_check_time if time_since_check.total_seconds() < 60: @@ -216,7 +238,7 @@ class StorPerfMaster(object): cinder_cli = cinder_utils.cinder_client(self.os_creds) glance_cli = glance_utils.glance_client(self.os_creds) - router_worker = self._thread_pool.apply_async( + router_worker = self._snaps_pool.apply_async( self.heat_stack.get_router_creators) vm_inst_creators = self.heat_stack.get_vm_inst_creators() @@ -234,7 +256,7 @@ class StorPerfMaster(object): server = vm1.get_vm_inst() - image_worker = self._thread_pool.apply_async( + image_worker = self._snaps_pool.apply_async( glance_utils.get_image_by_id, (glance_cli, server.image_id)) self._volume_count = len(server.volume_ids) @@ -339,6 +361,19 @@ class StorPerfMaster(object): self.logger.info("Custom workloads = %s" % value) self._custom_workloads = value + @property + def ssh_key(self): + if self._ssh_key is None: + return None + key = StringIO.StringIO(self._ssh_key) + pkey = paramiko.RSAKey.from_private_key(key) + key.close() + return pkey + + @ssh_key.setter + def ssh_key(self, value): + self._ssh_key = value + @property def is_stack_created(self): return (self.stack_id is not None and @@ -363,6 +398,8 @@ class StorPerfMaster(object): return logs def create_stack(self): + self.stackless = False + self.stack_settings.resource_files = [ 'storperf/resources/hot/storperf-agent.yaml', 'storperf/resources/hot/storperf-volume.yaml'] @@ -422,7 +459,8 @@ class StorPerfMaster(object): raise Exception("ERROR: Job {} is already running".format( self._test_executor.job_id)) - if (self.stack_id is None): + if (not self.stackless and + self.stack_id is None): raise ParameterError("ERROR: Stack %s does not exist" % self.stack_name) @@ -438,20 +476,23 @@ class StorPerfMaster(object): slaves = self._slave_addresses - setup_threads = [] + setup_pool = ThreadPool(processes=len(slaves)) + workers = [] for slave in slaves: - t = Thread(target=self._setup_slave, args=(slave,)) - setup_threads.append(t) - t.start() + worker = setup_pool.apply_async( + self._setup_slave, (slave,)) + workers.append(worker) - for thread in setup_threads: - thread.join() + for worker in workers: + worker.get() + + setup_pool.close() self._test_executor.slaves = slaves self._test_executor.volume_count = self.volume_count params = metadata - params['agent_count'] = self.agent_count + params['agent_count'] = len(slaves) params['agent_flavor'] = self.agent_flavor params['agent_image'] = self.agent_image params['agent_info'] = json.dumps(self.slave_info) @@ -466,9 +507,12 @@ class StorPerfMaster(object): params['volume_count'] = self.volume_count params['volume_size'] = self.volume_size params['volume_type'] = self.volume_type - if self.username and self.password: + if self.username: params['username'] = self.username + if self.password: params['password'] = self.password + if self.ssh_key: + params['ssh_key'] = self.ssh_key job_id = self._test_executor.execute(params) self.slave_info = {} @@ -552,13 +596,23 @@ class StorPerfMaster(object): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) if self.username and self.password: - ssh.connect(slave, - username=self.username, - password=self.password) + ssh.connect( + slave, + username=self.username, + password=self.password, + timeout=2) + elif self.username and self.ssh_key: + ssh.connect( + slave, + username=self.username, + pkey=self.ssh_key, + timeout=2) else: - ssh.connect(slave, username='storperf', - key_filename='storperf/resources/ssh/storperf_rsa', - timeout=2) + ssh.connect( + slave, + username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) uname = self._get_uname(ssh) logger.debug("Slave uname is %s" % uname) @@ -582,6 +636,12 @@ class StorPerfMaster(object): logger.debug("Transferring fio to %s" % slave) scp.put('/usr/local/bin/fio', '~/') + if self.mkfs is not None: + self._mkfs(ssh, logger) + + if self.mount_device is not None: + self._mount(ssh, logger) + def _get_uname(self, ssh): (_, stdout, _) = ssh.exec_command("uname -a") return stdout.readline() @@ -594,6 +654,59 @@ class StorPerfMaster(object): available = lines[3] return int(available) + def _mkfs(self, ssh, logger): + command = "sudo umount %s" % (self.mount_device) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + for line in iter(stderr.readline, b''): + logger.error(line) + + command = "sudo mkfs.%s %s" % (self.mkfs, self.mount_device) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + rc = stdout.channel.recv_exit_status() + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + error_messages = "" + for line in iter(stderr.readline, b''): + logger.error(line) + error_messages += line.rstrip() + + if rc != 0: + raise Exception( + "Error executing on {0}: {1}".format( + command, error_messages)) + + def _mount(self, ssh, logger): + command = "sudo mkdir -p %s" % (self.filename) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + for line in iter(stderr.readline, b''): + logger.error(line) + + command = "sudo mount %s %s" % (self.mount_device, self.filename) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + rc = stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + error_messages = "" + for line in iter(stderr.readline, b''): + logger.error(line) + error_messages += line.rstrip() + + if rc != 0: + raise Exception( + "Could not mount {0}: {1}".format( + self.mount_device, error_messages)) + def _resize_root_fs(self, ssh, logger): command = "sudo /usr/sbin/resize2fs /dev/vda1" logger.info("Attempting %s" % command) diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py index 53832b4..4b5bbd4 100644 --- a/docker/storperf-master/storperf/test_executor.py +++ b/docker/storperf-master/storperf/test_executor.py @@ -217,18 +217,19 @@ class TestExecutor(object): def execute(self, metadata): self.job_db.create_job_id() + self._setup_metadata(metadata) try: self.test_params() except Exception as e: self.terminate() raise e - self._setup_metadata(metadata) - self.job_db.record_workload_params(metadata) + stripped_metadata = metadata.copy() + stripped_metadata.pop('ssh_key', None) + self.job_db.record_workload_params(stripped_metadata) self._workload_thread = Thread(target=self.execute_workloads, args=(), name="Workload thread") self._workload_thread.start() - # seems to be hanging here return self.job_db.job_id def terminate(self): @@ -362,12 +363,17 @@ class TestExecutor(object): if self._custom_workloads: for workload_name in self._custom_workloads.iterkeys(): - if not workload_name.isalnum(): + real_name = workload_name + if real_name.startswith('_'): + real_name = real_name.replace('_', '') + self.logger.info("--- real_name: %s" % real_name) + + if not real_name.isalnum(): raise InvalidWorkloadName( "Workload name must be alphanumeric only: %s" % - workload_name) + real_name) workload = _custom_workload() - workload.options['name'] = workload_name + workload.options['name'] = real_name workload.name = workload_name if (self.filename is not None): workload.filename = self.filename diff --git a/docker/storperf-master/storperf/utilities/data_handler.py b/docker/storperf-master/storperf/utilities/data_handler.py index 6e87781..98ae640 100644 --- a/docker/storperf-master/storperf/utilities/data_handler.py +++ b/docker/storperf-master/storperf/utilities/data_handler.py @@ -157,9 +157,11 @@ class DataHandler(object): test_db = os.environ.get('TEST_DB_URL') if test_db is not None: self.logger.info("Pushing results to %s" % (test_db)) + stripped_metadata = executor.metadata + stripped_metadata.pop("ssh_key", None) response = test_results_db.push_results_to_db( test_db, - executor.metadata, + stripped_metadata, self.logger) if response: self.logger.info("Results reference: %s" % response['href']) diff --git a/docker/storperf-master/storperf/workloads/_base_workload.py b/docker/storperf-master/storperf/workloads/_base_workload.py index 9b04314..7468fea 100644 --- a/docker/storperf-master/storperf/workloads/_base_workload.py +++ b/docker/storperf-master/storperf/workloads/_base_workload.py @@ -44,17 +44,24 @@ class _base_workload(object): self.options['size'] = "100%" self.logger.debug( "Profiling a device, using 100% of " + self.filename) + self.options['filename'] = self.filename else: - self.options['size'] = self.default_filesize + if 'size' not in self.options: + self.options['size'] = self.default_filesize self.logger.debug("Profiling a filesystem, using " + - self.default_filesize + " file") - - self.options['filename'] = self.filename + self.options['size'] + " file") + if not self.filename.endswith('/'): + self.filename = self.filename + "/" + self.options['directory'] = self.filename + self.options['filename_format'] = "'storperf.$jobnum.$filenum'" self.setup() for key, value in self.options.iteritems(): - args.append('--' + key + "=" + value) + if value is not None: + args.append('--' + key + "=" + str(value)) + else: + args.append('--' + key) if parse_only: args.append('--parse-only') diff --git a/docker/storperf-master/storperf/workloads/_custom_workload.py b/docker/storperf-master/storperf/workloads/_custom_workload.py index 9e0100d..5cd37b3 100644 --- a/docker/storperf-master/storperf/workloads/_custom_workload.py +++ b/docker/storperf-master/storperf/workloads/_custom_workload.py @@ -18,12 +18,12 @@ class _custom_workload(_base_workload._base_workload): self.default_filesize = "1G" self.filename = '/dev/vdb' self.fixed_options = { - 'loops': '200', 'output-format': 'json', 'status-interval': '60' } self.options = { 'ioengine': 'libaio', + 'loops': '200', 'direct': '1', 'numjobs': '1', 'rw': 'read', -- cgit 1.2.3-korg