diff options
Diffstat (limited to 'docker/storperf-master/storperf/storperf_master.py')
-rw-r--r-- | docker/storperf-master/storperf/storperf_master.py | 175 |
1 files changed, 144 insertions, 31 deletions
diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py index 0c7e559..76c4807 100644 --- a/docker/storperf-master/storperf/storperf_master.py +++ b/docker/storperf-master/storperf/storperf_master.py @@ -7,25 +7,26 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +import StringIO from datetime import datetime +import json import logging +from multiprocessing.pool import ThreadPool import os import socket -from threading import Thread from time import sleep +import uuid import paramiko from scp import SCPClient - from snaps.config.stack import StackConfig from snaps.openstack.create_stack import OpenStackHeatStack from snaps.openstack.os_credentials import OSCreds from snaps.openstack.utils import heat_utils, cinder_utils, glance_utils from snaps.thread_utils import worker_pool + from storperf.db.job_db import JobDB from storperf.test_executor import TestExecutor -import json -import uuid class ParameterError(Exception): @@ -37,8 +38,9 @@ class StorPerfMaster(object): def __init__(self): self.logger = logging.getLogger(__name__) + self.reset_values() + self.job_db = JobDB() - self._stack_name = 'StorPerfAgentGroup' self.stack_settings = StackConfig( name=self.stack_name, template_path='storperf/resources/hot/agent-group.yaml') @@ -59,21 +61,26 @@ class StorPerfMaster(object): self.heat_stack = OpenStackHeatStack(self.os_creds, self.stack_settings) + + self._cached_stack_id = None + self._last_snaps_check_time = None + self._snaps_pool = worker_pool(20) + + def reset_values(self): + self._stack_name = 'StorPerfAgentGroup' self.username = None self.password = None + self._ssh_key = None self._test_executor = None self._agent_count = 1 - self._agent_image = "Ubuntu 14.04" - self._agent_flavor = "storperf" + self._agent_image = None + self._agent_flavor = None self._availability_zone = None self._public_network = None self._volume_count = 1 self._volume_size = 1 self._volume_type = None - self._cached_stack_id = None - self._last_snaps_check_time = None self._slave_addresses = [] - self._thread_pool = worker_pool(20) self._filename = None self._deadline = None self._steady_state_samples = 10 @@ -83,6 +90,9 @@ class StorPerfMaster(object): self._custom_workloads = [] self._subnet_CIDR = '172.16.0.0/16' self.slave_info = {} + self.stackless = False + self.mkfs = None + self.mount_device = None @property def volume_count(self): @@ -126,10 +136,14 @@ class StorPerfMaster(object): @stack_name.setter def stack_name(self, value): - self._stack_name = value - self.stack_settings.name = self.stack_name - self.stack_id = None - self._last_snaps_check_time = None + if value is None: + self.stackless = True + else: + self.stackless = False + self._stack_name = value + self.stack_settings.name = self.stack_name + self.stack_id = None + self._last_snaps_check_time = None @property def subnet_CIDR(self): @@ -194,6 +208,10 @@ class StorPerfMaster(object): def slave_addresses(self): return self._slave_addresses + @slave_addresses.setter + def slave_addresses(self, value): + self._slave_addresses = value + @property def stack_id(self): self._get_stack_info() @@ -204,6 +222,10 @@ class StorPerfMaster(object): self._cached_stack_id = value def _get_stack_info(self): + if self.stackless: + self._cached_stack_id = None + return None + if self._last_snaps_check_time is not None: time_since_check = datetime.now() - self._last_snaps_check_time if time_since_check.total_seconds() < 60: @@ -216,7 +238,7 @@ class StorPerfMaster(object): cinder_cli = cinder_utils.cinder_client(self.os_creds) glance_cli = glance_utils.glance_client(self.os_creds) - router_worker = self._thread_pool.apply_async( + router_worker = self._snaps_pool.apply_async( self.heat_stack.get_router_creators) vm_inst_creators = self.heat_stack.get_vm_inst_creators() @@ -234,7 +256,7 @@ class StorPerfMaster(object): server = vm1.get_vm_inst() - image_worker = self._thread_pool.apply_async( + image_worker = self._snaps_pool.apply_async( glance_utils.get_image_by_id, (glance_cli, server.image_id)) self._volume_count = len(server.volume_ids) @@ -340,6 +362,19 @@ class StorPerfMaster(object): self._custom_workloads = value @property + def ssh_key(self): + if self._ssh_key is None: + return None + key = StringIO.StringIO(self._ssh_key) + pkey = paramiko.RSAKey.from_private_key(key) + key.close() + return pkey + + @ssh_key.setter + def ssh_key(self, value): + self._ssh_key = value + + @property def is_stack_created(self): return (self.stack_id is not None and (self.heat_stack.get_status() == u'CREATE_COMPLETE' or @@ -363,6 +398,8 @@ class StorPerfMaster(object): return logs def create_stack(self): + self.stackless = False + self.stack_settings.resource_files = [ 'storperf/resources/hot/storperf-agent.yaml', 'storperf/resources/hot/storperf-volume.yaml'] @@ -422,7 +459,8 @@ class StorPerfMaster(object): raise Exception("ERROR: Job {} is already running".format( self._test_executor.job_id)) - if (self.stack_id is None): + if (not self.stackless and + self.stack_id is None): raise ParameterError("ERROR: Stack %s does not exist" % self.stack_name) @@ -438,20 +476,23 @@ class StorPerfMaster(object): slaves = self._slave_addresses - setup_threads = [] + setup_pool = ThreadPool(processes=len(slaves)) + workers = [] for slave in slaves: - t = Thread(target=self._setup_slave, args=(slave,)) - setup_threads.append(t) - t.start() + worker = setup_pool.apply_async( + self._setup_slave, (slave,)) + workers.append(worker) - for thread in setup_threads: - thread.join() + for worker in workers: + worker.get() + + setup_pool.close() self._test_executor.slaves = slaves self._test_executor.volume_count = self.volume_count params = metadata - params['agent_count'] = self.agent_count + params['agent_count'] = len(slaves) params['agent_flavor'] = self.agent_flavor params['agent_image'] = self.agent_image params['agent_info'] = json.dumps(self.slave_info) @@ -466,9 +507,12 @@ class StorPerfMaster(object): params['volume_count'] = self.volume_count params['volume_size'] = self.volume_size params['volume_type'] = self.volume_type - if self.username and self.password: + if self.username: params['username'] = self.username + if self.password: params['password'] = self.password + if self.ssh_key: + params['ssh_key'] = self.ssh_key job_id = self._test_executor.execute(params) self.slave_info = {} @@ -552,13 +596,23 @@ class StorPerfMaster(object): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) if self.username and self.password: - ssh.connect(slave, - username=self.username, - password=self.password) + ssh.connect( + slave, + username=self.username, + password=self.password, + timeout=2) + elif self.username and self.ssh_key: + ssh.connect( + slave, + username=self.username, + pkey=self.ssh_key, + timeout=2) else: - ssh.connect(slave, username='storperf', - key_filename='storperf/resources/ssh/storperf_rsa', - timeout=2) + ssh.connect( + slave, + username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) uname = self._get_uname(ssh) logger.debug("Slave uname is %s" % uname) @@ -582,6 +636,12 @@ class StorPerfMaster(object): logger.debug("Transferring fio to %s" % slave) scp.put('/usr/local/bin/fio', '~/') + if self.mkfs is not None: + self._mkfs(ssh, logger) + + if self.mount_device is not None: + self._mount(ssh, logger) + def _get_uname(self, ssh): (_, stdout, _) = ssh.exec_command("uname -a") return stdout.readline() @@ -594,6 +654,59 @@ class StorPerfMaster(object): available = lines[3] return int(available) + def _mkfs(self, ssh, logger): + command = "sudo umount %s" % (self.mount_device) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + for line in iter(stderr.readline, b''): + logger.error(line) + + command = "sudo mkfs.%s %s" % (self.mkfs, self.mount_device) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + rc = stdout.channel.recv_exit_status() + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + error_messages = "" + for line in iter(stderr.readline, b''): + logger.error(line) + error_messages += line.rstrip() + + if rc != 0: + raise Exception( + "Error executing on {0}: {1}".format( + command, error_messages)) + + def _mount(self, ssh, logger): + command = "sudo mkdir -p %s" % (self.filename) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + for line in iter(stderr.readline, b''): + logger.error(line) + + command = "sudo mount %s %s" % (self.mount_device, self.filename) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + rc = stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + error_messages = "" + for line in iter(stderr.readline, b''): + logger.error(line) + error_messages += line.rstrip() + + if rc != 0: + raise Exception( + "Could not mount {0}: {1}".format( + self.mount_device, error_messages)) + def _resize_root_fs(self, ssh, logger): command = "sudo /usr/sbin/resize2fs /dev/vda1" logger.info("Attempting %s" % command) |