diff options
author | Mark Beierl <mark.beierl@dell.com> | 2018-11-05 18:38:17 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@opnfv.org> | 2018-11-05 18:38:17 +0000 |
commit | c7842191ede010b05366204668cb3c80518461e5 (patch) | |
tree | 010c60e684d8b3f591d7af926caf899fb0994e94 /docker/storperf-master | |
parent | c46cd425bd8fd2c9b652a14d2e2400e3de644fbe (diff) | |
parent | bf29ec6e9a5f742d71e7d5cafe009b7223f46782 (diff) |
Merge "Add Stackless Support"
Diffstat (limited to 'docker/storperf-master')
7 files changed, 279 insertions, 56 deletions
diff --git a/docker/storperf-master/rest_server.py b/docker/storperf-master/rest_server.py index 92b6c85..ce3a41c 100644 --- a/docker/storperf-master/rest_server.py +++ b/docker/storperf-master/rest_server.py @@ -379,11 +379,13 @@ the last stack named. if not request.json: abort(400, "ERROR: Missing configuration data") + storperf.reset_values() self.logger.info(request.json) try: if ('stack_name' in request.json): storperf.stack_name = request.json['stack_name'] + storperf.stackless = False if ('target' in request.json): storperf.filename = request.json['target'] if ('deadline' in request.json): @@ -439,7 +441,7 @@ class WorkloadsBodyModel: @swagger.model @swagger.nested( - name=WorkloadsBodyModel.__name__) + name=WorkloadsBodyModel.__name__) class WorkloadsNameModel: resource_fields = { "name": fields.Nested(WorkloadsBodyModel.resource_fields) @@ -448,7 +450,7 @@ class WorkloadsNameModel: @swagger.model @swagger.nested( - workloads=WorkloadsNameModel.__name__) + workloads=WorkloadsNameModel.__name__) class WorkloadV2Model: resource_fields = { 'target': fields.String, @@ -457,7 +459,11 @@ class WorkloadV2Model: 'workloads': fields.Nested(WorkloadsNameModel.resource_fields), 'queue_depths': fields.String, 'block_sizes': fields.String, - 'stack_name': fields.String + 'stack_name': fields.String, + 'username': fields.String, + 'password': fields.String, + 'ssh_private_key': fields.String, + 'slave_addresses': fields.List } required = ['workloads'] @@ -484,7 +490,19 @@ for any single test iteration. "workloads": A JSON formatted map of workload names and parameters for FIO. "stack_name": The target stack to use. Defaults to StorPerfAgentGroup, or -the last stack named. +the last stack named. Explicitly specifying null will bypass all Heat Stack +operations and go directly against the IP addresses specified. + +"username": if specified, the username to use when logging into the slave. + +"password": if specified, the password to use when logging into the slave. + +"ssh_private_key": if specified, the ssh private key to use when logging +into the slave. + +"slave_addresses": if specified, a list of IP addresses to use instead of +looking all of them up from the stack. + """, "required": True, "type": "WorkloadV2Model", @@ -505,9 +523,10 @@ the last stack named. ) def post(self): if not request.json: - abort(400, "ERROR: Missing configuration data") + abort(400, "ERROR: Missing job data") self.logger.info(request.json) + storperf.reset_values() try: if ('stack_name' in request.json): @@ -534,6 +553,15 @@ the last stack named. else: metadata = {} + if 'username' in request.json: + storperf.username = request.json['username'] + if 'password' in request.json: + storperf.password = request.json['password'] + if 'ssh_private_key' in request.json: + storperf.ssh_key = request.json['ssh_private_key'] + if 'slave_addresses' in request.json: + storperf.slave_addresses = request.json['slave_addresses'] + job_id = storperf.execute_workloads(metadata) return jsonify({'job_id': job_id}) @@ -547,7 +575,15 @@ the last stack named. class WarmUpModel: resource_fields = { 'stack_name': fields.String, - 'target': fields.String + 'target': fields.String, + 'username': fields.String, + 'password': fields.String, + 'ssh_private_key': fields.String, + 'slave_addresses': fields.List, + 'mkfs': fields.String, + 'mount_point': fields.String, + 'file_size': fields.String, + 'file_count': fields.String } @@ -565,10 +601,35 @@ class Initialize(Resource): "description": """Fill the target with random data. If no target is specified, it will default to /dev/vdb -"target": The target device or file to fill with random data. +"target": The target device to use. "stack_name": The target stack to use. Defaults to StorPerfAgentGroup, or -the last stack named. +the last stack named. Explicitly specifying null will bypass all Heat Stack +operations and go directly against the IP addresses specified. + +"username": if specified, the username to use when logging into the slave. + +"password": if specified, the password to use when logging into the slave. + +"ssh_private_key": if specified, the ssh private key to use when logging +into the slave. + +"slave_addresses": if specified, a list of IP addresses to use instead of +looking all of them up from the stack. + +"mkfs": if specified, the command to execute in order to create a filesystem +on the target device (eg: mkfs.ext4) + +"mount_point": if specified, the directory to use when mounting the device. + +"filesize": if specified, the size of the files to create when profiling +a filesystem. + +"nrfiles": if specified, the number of files to create when profiling +a filesystem + +"numjobs": if specified, the number of jobs for when profiling +a filesystem """, "required": False, "type": "WarmUpModel", @@ -593,17 +654,46 @@ the last stack named. ) def post(self): self.logger.info(request.json) + storperf.reset_values() try: + warm_up_args = { + 'rw': 'randwrite', + 'direct': "1", + 'loops': "1" + } + storperf.queue_depths = "8" + storperf.block_sizes = "16k" + if request.json: if 'target' in request.json: storperf.filename = request.json['target'] if 'stack_name' in request.json: storperf.stack_name = request.json['stack_name'] - storperf.queue_depths = "8" - storperf.block_sizes = "16k" - storperf.workloads = "_warm_up" - storperf.custom_workloads = None + if 'username' in request.json: + storperf.username = request.json['username'] + if 'password' in request.json: + storperf.password = request.json['password'] + if 'ssh_private_key' in request.json: + storperf.ssh_key = request.json['ssh_private_key'] + if 'slave_addresses' in request.json: + storperf.slave_addresses = request.json['slave_addresses'] + if 'mkfs' in request.json: + storperf.mkfs = request.json['mkfs'] + if 'mount_device' in request.json: + storperf.mount_device = request.json['mount_device'] + if 'filesize' in request.json: + warm_up_args['filesize'] = str(request.json['filesize']) + if 'nrfiles' in request.json: + warm_up_args['nrfiles'] = str(request.json['nrfiles']) + if 'numjobs' in request.json: + warm_up_args['numjobs'] = str(request.json['numjobs']) + + storperf.workloads = None + storperf.custom_workloads = { + '_warm_up': warm_up_args + } + self.logger.info(storperf.custom_workloads) job_id = storperf.execute_workloads() return jsonify({'job_id': job_id}) diff --git a/docker/storperf-master/storperf/fio/fio_invoker.py b/docker/storperf-master/storperf/fio/fio_invoker.py index a361eec..c665598 100644 --- a/docker/storperf-master/storperf/fio/fio_invoker.py +++ b/docker/storperf-master/storperf/fio/fio_invoker.py @@ -158,6 +158,11 @@ class FIOInvoker(object): username=self.metadata['username'], password=self.metadata['password']) return ssh + elif 'username' in self.metadata and 'ssh_key' in self.metadata: + ssh.connect(self.remote_host, + username=self.metadata['username'], + pkey=self.metadata['ssh_key']) + return ssh else: ssh.connect(self.remote_host, username='storperf', key_filename='storperf/resources/ssh/storperf_rsa', diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py index 0c7e559..76c4807 100644 --- a/docker/storperf-master/storperf/storperf_master.py +++ b/docker/storperf-master/storperf/storperf_master.py @@ -7,25 +7,26 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +import StringIO from datetime import datetime +import json import logging +from multiprocessing.pool import ThreadPool import os import socket -from threading import Thread from time import sleep +import uuid import paramiko from scp import SCPClient - from snaps.config.stack import StackConfig from snaps.openstack.create_stack import OpenStackHeatStack from snaps.openstack.os_credentials import OSCreds from snaps.openstack.utils import heat_utils, cinder_utils, glance_utils from snaps.thread_utils import worker_pool + from storperf.db.job_db import JobDB from storperf.test_executor import TestExecutor -import json -import uuid class ParameterError(Exception): @@ -37,8 +38,9 @@ class StorPerfMaster(object): def __init__(self): self.logger = logging.getLogger(__name__) + self.reset_values() + self.job_db = JobDB() - self._stack_name = 'StorPerfAgentGroup' self.stack_settings = StackConfig( name=self.stack_name, template_path='storperf/resources/hot/agent-group.yaml') @@ -59,21 +61,26 @@ class StorPerfMaster(object): self.heat_stack = OpenStackHeatStack(self.os_creds, self.stack_settings) + + self._cached_stack_id = None + self._last_snaps_check_time = None + self._snaps_pool = worker_pool(20) + + def reset_values(self): + self._stack_name = 'StorPerfAgentGroup' self.username = None self.password = None + self._ssh_key = None self._test_executor = None self._agent_count = 1 - self._agent_image = "Ubuntu 14.04" - self._agent_flavor = "storperf" + self._agent_image = None + self._agent_flavor = None self._availability_zone = None self._public_network = None self._volume_count = 1 self._volume_size = 1 self._volume_type = None - self._cached_stack_id = None - self._last_snaps_check_time = None self._slave_addresses = [] - self._thread_pool = worker_pool(20) self._filename = None self._deadline = None self._steady_state_samples = 10 @@ -83,6 +90,9 @@ class StorPerfMaster(object): self._custom_workloads = [] self._subnet_CIDR = '172.16.0.0/16' self.slave_info = {} + self.stackless = False + self.mkfs = None + self.mount_device = None @property def volume_count(self): @@ -126,10 +136,14 @@ class StorPerfMaster(object): @stack_name.setter def stack_name(self, value): - self._stack_name = value - self.stack_settings.name = self.stack_name - self.stack_id = None - self._last_snaps_check_time = None + if value is None: + self.stackless = True + else: + self.stackless = False + self._stack_name = value + self.stack_settings.name = self.stack_name + self.stack_id = None + self._last_snaps_check_time = None @property def subnet_CIDR(self): @@ -194,6 +208,10 @@ class StorPerfMaster(object): def slave_addresses(self): return self._slave_addresses + @slave_addresses.setter + def slave_addresses(self, value): + self._slave_addresses = value + @property def stack_id(self): self._get_stack_info() @@ -204,6 +222,10 @@ class StorPerfMaster(object): self._cached_stack_id = value def _get_stack_info(self): + if self.stackless: + self._cached_stack_id = None + return None + if self._last_snaps_check_time is not None: time_since_check = datetime.now() - self._last_snaps_check_time if time_since_check.total_seconds() < 60: @@ -216,7 +238,7 @@ class StorPerfMaster(object): cinder_cli = cinder_utils.cinder_client(self.os_creds) glance_cli = glance_utils.glance_client(self.os_creds) - router_worker = self._thread_pool.apply_async( + router_worker = self._snaps_pool.apply_async( self.heat_stack.get_router_creators) vm_inst_creators = self.heat_stack.get_vm_inst_creators() @@ -234,7 +256,7 @@ class StorPerfMaster(object): server = vm1.get_vm_inst() - image_worker = self._thread_pool.apply_async( + image_worker = self._snaps_pool.apply_async( glance_utils.get_image_by_id, (glance_cli, server.image_id)) self._volume_count = len(server.volume_ids) @@ -340,6 +362,19 @@ class StorPerfMaster(object): self._custom_workloads = value @property + def ssh_key(self): + if self._ssh_key is None: + return None + key = StringIO.StringIO(self._ssh_key) + pkey = paramiko.RSAKey.from_private_key(key) + key.close() + return pkey + + @ssh_key.setter + def ssh_key(self, value): + self._ssh_key = value + + @property def is_stack_created(self): return (self.stack_id is not None and (self.heat_stack.get_status() == u'CREATE_COMPLETE' or @@ -363,6 +398,8 @@ class StorPerfMaster(object): return logs def create_stack(self): + self.stackless = False + self.stack_settings.resource_files = [ 'storperf/resources/hot/storperf-agent.yaml', 'storperf/resources/hot/storperf-volume.yaml'] @@ -422,7 +459,8 @@ class StorPerfMaster(object): raise Exception("ERROR: Job {} is already running".format( self._test_executor.job_id)) - if (self.stack_id is None): + if (not self.stackless and + self.stack_id is None): raise ParameterError("ERROR: Stack %s does not exist" % self.stack_name) @@ -438,20 +476,23 @@ class StorPerfMaster(object): slaves = self._slave_addresses - setup_threads = [] + setup_pool = ThreadPool(processes=len(slaves)) + workers = [] for slave in slaves: - t = Thread(target=self._setup_slave, args=(slave,)) - setup_threads.append(t) - t.start() + worker = setup_pool.apply_async( + self._setup_slave, (slave,)) + workers.append(worker) - for thread in setup_threads: - thread.join() + for worker in workers: + worker.get() + + setup_pool.close() self._test_executor.slaves = slaves self._test_executor.volume_count = self.volume_count params = metadata - params['agent_count'] = self.agent_count + params['agent_count'] = len(slaves) params['agent_flavor'] = self.agent_flavor params['agent_image'] = self.agent_image params['agent_info'] = json.dumps(self.slave_info) @@ -466,9 +507,12 @@ class StorPerfMaster(object): params['volume_count'] = self.volume_count params['volume_size'] = self.volume_size params['volume_type'] = self.volume_type - if self.username and self.password: + if self.username: params['username'] = self.username + if self.password: params['password'] = self.password + if self.ssh_key: + params['ssh_key'] = self.ssh_key job_id = self._test_executor.execute(params) self.slave_info = {} @@ -552,13 +596,23 @@ class StorPerfMaster(object): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) if self.username and self.password: - ssh.connect(slave, - username=self.username, - password=self.password) + ssh.connect( + slave, + username=self.username, + password=self.password, + timeout=2) + elif self.username and self.ssh_key: + ssh.connect( + slave, + username=self.username, + pkey=self.ssh_key, + timeout=2) else: - ssh.connect(slave, username='storperf', - key_filename='storperf/resources/ssh/storperf_rsa', - timeout=2) + ssh.connect( + slave, + username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) uname = self._get_uname(ssh) logger.debug("Slave uname is %s" % uname) @@ -582,6 +636,12 @@ class StorPerfMaster(object): logger.debug("Transferring fio to %s" % slave) scp.put('/usr/local/bin/fio', '~/') + if self.mkfs is not None: + self._mkfs(ssh, logger) + + if self.mount_device is not None: + self._mount(ssh, logger) + def _get_uname(self, ssh): (_, stdout, _) = ssh.exec_command("uname -a") return stdout.readline() @@ -594,6 +654,59 @@ class StorPerfMaster(object): available = lines[3] return int(available) + def _mkfs(self, ssh, logger): + command = "sudo umount %s" % (self.mount_device) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + for line in iter(stderr.readline, b''): + logger.error(line) + + command = "sudo mkfs.%s %s" % (self.mkfs, self.mount_device) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + rc = stdout.channel.recv_exit_status() + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + error_messages = "" + for line in iter(stderr.readline, b''): + logger.error(line) + error_messages += line.rstrip() + + if rc != 0: + raise Exception( + "Error executing on {0}: {1}".format( + command, error_messages)) + + def _mount(self, ssh, logger): + command = "sudo mkdir -p %s" % (self.filename) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + for line in iter(stderr.readline, b''): + logger.error(line) + + command = "sudo mount %s %s" % (self.mount_device, self.filename) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + rc = stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + error_messages = "" + for line in iter(stderr.readline, b''): + logger.error(line) + error_messages += line.rstrip() + + if rc != 0: + raise Exception( + "Could not mount {0}: {1}".format( + self.mount_device, error_messages)) + def _resize_root_fs(self, ssh, logger): command = "sudo /usr/sbin/resize2fs /dev/vda1" logger.info("Attempting %s" % command) diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py index 53832b4..4b5bbd4 100644 --- a/docker/storperf-master/storperf/test_executor.py +++ b/docker/storperf-master/storperf/test_executor.py @@ -217,18 +217,19 @@ class TestExecutor(object): def execute(self, metadata): self.job_db.create_job_id() + self._setup_metadata(metadata) try: self.test_params() except Exception as e: self.terminate() raise e - self._setup_metadata(metadata) - self.job_db.record_workload_params(metadata) + stripped_metadata = metadata.copy() + stripped_metadata.pop('ssh_key', None) + self.job_db.record_workload_params(stripped_metadata) self._workload_thread = Thread(target=self.execute_workloads, args=(), name="Workload thread") self._workload_thread.start() - # seems to be hanging here return self.job_db.job_id def terminate(self): @@ -362,12 +363,17 @@ class TestExecutor(object): if self._custom_workloads: for workload_name in self._custom_workloads.iterkeys(): - if not workload_name.isalnum(): + real_name = workload_name + if real_name.startswith('_'): + real_name = real_name.replace('_', '') + self.logger.info("--- real_name: %s" % real_name) + + if not real_name.isalnum(): raise InvalidWorkloadName( "Workload name must be alphanumeric only: %s" % - workload_name) + real_name) workload = _custom_workload() - workload.options['name'] = workload_name + workload.options['name'] = real_name workload.name = workload_name if (self.filename is not None): workload.filename = self.filename diff --git a/docker/storperf-master/storperf/utilities/data_handler.py b/docker/storperf-master/storperf/utilities/data_handler.py index 6e87781..98ae640 100644 --- a/docker/storperf-master/storperf/utilities/data_handler.py +++ b/docker/storperf-master/storperf/utilities/data_handler.py @@ -157,9 +157,11 @@ class DataHandler(object): test_db = os.environ.get('TEST_DB_URL') if test_db is not None: self.logger.info("Pushing results to %s" % (test_db)) + stripped_metadata = executor.metadata + stripped_metadata.pop("ssh_key", None) response = test_results_db.push_results_to_db( test_db, - executor.metadata, + stripped_metadata, self.logger) if response: self.logger.info("Results reference: %s" % response['href']) diff --git a/docker/storperf-master/storperf/workloads/_base_workload.py b/docker/storperf-master/storperf/workloads/_base_workload.py index 9b04314..7468fea 100644 --- a/docker/storperf-master/storperf/workloads/_base_workload.py +++ b/docker/storperf-master/storperf/workloads/_base_workload.py @@ -44,17 +44,24 @@ class _base_workload(object): self.options['size'] = "100%" self.logger.debug( "Profiling a device, using 100% of " + self.filename) + self.options['filename'] = self.filename else: - self.options['size'] = self.default_filesize + if 'size' not in self.options: + self.options['size'] = self.default_filesize self.logger.debug("Profiling a filesystem, using " + - self.default_filesize + " file") - - self.options['filename'] = self.filename + self.options['size'] + " file") + if not self.filename.endswith('/'): + self.filename = self.filename + "/" + self.options['directory'] = self.filename + self.options['filename_format'] = "'storperf.$jobnum.$filenum'" self.setup() for key, value in self.options.iteritems(): - args.append('--' + key + "=" + value) + if value is not None: + args.append('--' + key + "=" + str(value)) + else: + args.append('--' + key) if parse_only: args.append('--parse-only') diff --git a/docker/storperf-master/storperf/workloads/_custom_workload.py b/docker/storperf-master/storperf/workloads/_custom_workload.py index 9e0100d..5cd37b3 100644 --- a/docker/storperf-master/storperf/workloads/_custom_workload.py +++ b/docker/storperf-master/storperf/workloads/_custom_workload.py @@ -18,12 +18,12 @@ class _custom_workload(_base_workload._base_workload): self.default_filesize = "1G" self.filename = '/dev/vdb' self.fixed_options = { - 'loops': '200', 'output-format': 'json', 'status-interval': '60' } self.options = { 'ioengine': 'libaio', + 'loops': '200', 'direct': '1', 'numjobs': '1', 'rw': 'read', |