diff options
author | mbeierl <mark.beierl@dell.com> | 2018-10-24 17:05:37 -0400 |
---|---|---|
committer | mbeierl <mark.beierl@dell.com> | 2018-10-29 11:02:00 -0400 |
commit | bf29ec6e9a5f742d71e7d5cafe009b7223f46782 (patch) | |
tree | bd9cee239de2c4aa4982c27864807385fd4cd61f /docker/storperf-master/storperf/storperf_master.py | |
parent | 518988ca97031ef0e64e15db5ec2d8f2b86d49e9 (diff) |
Add Stackless Support
Adds ability to specify IP address list instead of looking up
full list from OpenStack Heat.
Adds stackless mode to bypass OpenStack heat altogether if
we are running against bare metal or other nodes that are
not under Heat's domain.
Adds ability to create filesystems and mount them for profiling.
Adds number of jobs and number of files to create to the
initializations API so we can lay down files and fill them
with random data ahead of the actual performance run.
Change-Id: Ia787f8b863bc92b38dd29b3cf17eda0d48f3bcd5
JIRA: STORPERF-265
Signed-off-by: mbeierl <mark.beierl@dell.com>
Diffstat (limited to 'docker/storperf-master/storperf/storperf_master.py')
-rw-r--r-- | docker/storperf-master/storperf/storperf_master.py | 175 |
1 files changed, 144 insertions, 31 deletions
diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py index 0c7e559..76c4807 100644 --- a/docker/storperf-master/storperf/storperf_master.py +++ b/docker/storperf-master/storperf/storperf_master.py @@ -7,25 +7,26 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +import StringIO from datetime import datetime +import json import logging +from multiprocessing.pool import ThreadPool import os import socket -from threading import Thread from time import sleep +import uuid import paramiko from scp import SCPClient - from snaps.config.stack import StackConfig from snaps.openstack.create_stack import OpenStackHeatStack from snaps.openstack.os_credentials import OSCreds from snaps.openstack.utils import heat_utils, cinder_utils, glance_utils from snaps.thread_utils import worker_pool + from storperf.db.job_db import JobDB from storperf.test_executor import TestExecutor -import json -import uuid class ParameterError(Exception): @@ -37,8 +38,9 @@ class StorPerfMaster(object): def __init__(self): self.logger = logging.getLogger(__name__) + self.reset_values() + self.job_db = JobDB() - self._stack_name = 'StorPerfAgentGroup' self.stack_settings = StackConfig( name=self.stack_name, template_path='storperf/resources/hot/agent-group.yaml') @@ -59,21 +61,26 @@ class StorPerfMaster(object): self.heat_stack = OpenStackHeatStack(self.os_creds, self.stack_settings) + + self._cached_stack_id = None + self._last_snaps_check_time = None + self._snaps_pool = worker_pool(20) + + def reset_values(self): + self._stack_name = 'StorPerfAgentGroup' self.username = None self.password = None + self._ssh_key = None self._test_executor = None self._agent_count = 1 - self._agent_image = "Ubuntu 14.04" - self._agent_flavor = "storperf" + self._agent_image = None + self._agent_flavor = None self._availability_zone = None self._public_network = None self._volume_count = 1 self._volume_size = 1 self._volume_type = None - self._cached_stack_id = None - self._last_snaps_check_time = None self._slave_addresses = [] - self._thread_pool = worker_pool(20) self._filename = None self._deadline = None self._steady_state_samples = 10 @@ -83,6 +90,9 @@ class StorPerfMaster(object): self._custom_workloads = [] self._subnet_CIDR = '172.16.0.0/16' self.slave_info = {} + self.stackless = False + self.mkfs = None + self.mount_device = None @property def volume_count(self): @@ -126,10 +136,14 @@ class StorPerfMaster(object): @stack_name.setter def stack_name(self, value): - self._stack_name = value - self.stack_settings.name = self.stack_name - self.stack_id = None - self._last_snaps_check_time = None + if value is None: + self.stackless = True + else: + self.stackless = False + self._stack_name = value + self.stack_settings.name = self.stack_name + self.stack_id = None + self._last_snaps_check_time = None @property def subnet_CIDR(self): @@ -194,6 +208,10 @@ class StorPerfMaster(object): def slave_addresses(self): return self._slave_addresses + @slave_addresses.setter + def slave_addresses(self, value): + self._slave_addresses = value + @property def stack_id(self): self._get_stack_info() @@ -204,6 +222,10 @@ class StorPerfMaster(object): self._cached_stack_id = value def _get_stack_info(self): + if self.stackless: + self._cached_stack_id = None + return None + if self._last_snaps_check_time is not None: time_since_check = datetime.now() - self._last_snaps_check_time if time_since_check.total_seconds() < 60: @@ -216,7 +238,7 @@ class StorPerfMaster(object): cinder_cli = cinder_utils.cinder_client(self.os_creds) glance_cli = glance_utils.glance_client(self.os_creds) - router_worker = self._thread_pool.apply_async( + router_worker = self._snaps_pool.apply_async( self.heat_stack.get_router_creators) vm_inst_creators = self.heat_stack.get_vm_inst_creators() @@ -234,7 +256,7 @@ class StorPerfMaster(object): server = vm1.get_vm_inst() - image_worker = self._thread_pool.apply_async( + image_worker = self._snaps_pool.apply_async( glance_utils.get_image_by_id, (glance_cli, server.image_id)) self._volume_count = len(server.volume_ids) @@ -340,6 +362,19 @@ class StorPerfMaster(object): self._custom_workloads = value @property + def ssh_key(self): + if self._ssh_key is None: + return None + key = StringIO.StringIO(self._ssh_key) + pkey = paramiko.RSAKey.from_private_key(key) + key.close() + return pkey + + @ssh_key.setter + def ssh_key(self, value): + self._ssh_key = value + + @property def is_stack_created(self): return (self.stack_id is not None and (self.heat_stack.get_status() == u'CREATE_COMPLETE' or @@ -363,6 +398,8 @@ class StorPerfMaster(object): return logs def create_stack(self): + self.stackless = False + self.stack_settings.resource_files = [ 'storperf/resources/hot/storperf-agent.yaml', 'storperf/resources/hot/storperf-volume.yaml'] @@ -422,7 +459,8 @@ class StorPerfMaster(object): raise Exception("ERROR: Job {} is already running".format( self._test_executor.job_id)) - if (self.stack_id is None): + if (not self.stackless and + self.stack_id is None): raise ParameterError("ERROR: Stack %s does not exist" % self.stack_name) @@ -438,20 +476,23 @@ class StorPerfMaster(object): slaves = self._slave_addresses - setup_threads = [] + setup_pool = ThreadPool(processes=len(slaves)) + workers = [] for slave in slaves: - t = Thread(target=self._setup_slave, args=(slave,)) - setup_threads.append(t) - t.start() + worker = setup_pool.apply_async( + self._setup_slave, (slave,)) + workers.append(worker) - for thread in setup_threads: - thread.join() + for worker in workers: + worker.get() + + setup_pool.close() self._test_executor.slaves = slaves self._test_executor.volume_count = self.volume_count params = metadata - params['agent_count'] = self.agent_count + params['agent_count'] = len(slaves) params['agent_flavor'] = self.agent_flavor params['agent_image'] = self.agent_image params['agent_info'] = json.dumps(self.slave_info) @@ -466,9 +507,12 @@ class StorPerfMaster(object): params['volume_count'] = self.volume_count params['volume_size'] = self.volume_size params['volume_type'] = self.volume_type - if self.username and self.password: + if self.username: params['username'] = self.username + if self.password: params['password'] = self.password + if self.ssh_key: + params['ssh_key'] = self.ssh_key job_id = self._test_executor.execute(params) self.slave_info = {} @@ -552,13 +596,23 @@ class StorPerfMaster(object): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) if self.username and self.password: - ssh.connect(slave, - username=self.username, - password=self.password) + ssh.connect( + slave, + username=self.username, + password=self.password, + timeout=2) + elif self.username and self.ssh_key: + ssh.connect( + slave, + username=self.username, + pkey=self.ssh_key, + timeout=2) else: - ssh.connect(slave, username='storperf', - key_filename='storperf/resources/ssh/storperf_rsa', - timeout=2) + ssh.connect( + slave, + username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) uname = self._get_uname(ssh) logger.debug("Slave uname is %s" % uname) @@ -582,6 +636,12 @@ class StorPerfMaster(object): logger.debug("Transferring fio to %s" % slave) scp.put('/usr/local/bin/fio', '~/') + if self.mkfs is not None: + self._mkfs(ssh, logger) + + if self.mount_device is not None: + self._mount(ssh, logger) + def _get_uname(self, ssh): (_, stdout, _) = ssh.exec_command("uname -a") return stdout.readline() @@ -594,6 +654,59 @@ class StorPerfMaster(object): available = lines[3] return int(available) + def _mkfs(self, ssh, logger): + command = "sudo umount %s" % (self.mount_device) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + for line in iter(stderr.readline, b''): + logger.error(line) + + command = "sudo mkfs.%s %s" % (self.mkfs, self.mount_device) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + rc = stdout.channel.recv_exit_status() + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + error_messages = "" + for line in iter(stderr.readline, b''): + logger.error(line) + error_messages += line.rstrip() + + if rc != 0: + raise Exception( + "Error executing on {0}: {1}".format( + command, error_messages)) + + def _mount(self, ssh, logger): + command = "sudo mkdir -p %s" % (self.filename) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + for line in iter(stderr.readline, b''): + logger.error(line) + + command = "sudo mount %s %s" % (self.mount_device, self.filename) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + rc = stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + error_messages = "" + for line in iter(stderr.readline, b''): + logger.error(line) + error_messages += line.rstrip() + + if rc != 0: + raise Exception( + "Could not mount {0}: {1}".format( + self.mount_device, error_messages)) + def _resize_root_fs(self, ssh, logger): command = "sudo /usr/sbin/resize2fs /dev/vda1" logger.info("Attempting %s" % command) |