diff options
Diffstat (limited to 'docker/storperf-master/storperf/storperf_master.py')
-rw-r--r-- | docker/storperf-master/storperf/storperf_master.py | 232 |
1 files changed, 192 insertions, 40 deletions
diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py index b17dae9..73f8f0d 100644 --- a/docker/storperf-master/storperf/storperf_master.py +++ b/docker/storperf-master/storperf/storperf_master.py @@ -7,16 +7,11 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from datetime import datetime -import logging -import os -import socket -from threading import Thread -from time import sleep -import paramiko +from datetime import datetime +from io import StringIO +from multiprocessing.pool import ThreadPool from scp import SCPClient - from snaps.config.stack import StackConfig from snaps.openstack.create_stack import OpenStackHeatStack from snaps.openstack.os_credentials import OSCreds @@ -24,7 +19,14 @@ from snaps.openstack.utils import heat_utils, cinder_utils, glance_utils from snaps.thread_utils import worker_pool from storperf.db.job_db import JobDB from storperf.test_executor import TestExecutor +from storperf.utilities import ip_helper +from time import sleep import json +import logging +import os +import paramiko +import socket +import uuid class ParameterError(Exception): @@ -36,10 +38,11 @@ class StorPerfMaster(object): def __init__(self): self.logger = logging.getLogger(__name__) - self.job_db = JobDB() + self.reset_values() + self.job_db = JobDB() self.stack_settings = StackConfig( - name='StorPerfAgentGroup', + name=self.stack_name, template_path='storperf/resources/hot/agent-group.yaml') self.os_creds = OSCreds( @@ -58,29 +61,38 @@ class StorPerfMaster(object): self.heat_stack = OpenStackHeatStack(self.os_creds, self.stack_settings) + + self._snaps_pool = worker_pool(20) + + def reset_values(self): + self._stack_name = 'StorPerfAgentGroup' self.username = None self.password = None + self._ssh_key = None self._test_executor = None self._agent_count = 1 - self._agent_image = "Ubuntu 14.04" - self._agent_flavor = "storperf" + self._agent_image = None + self._agent_flavor = None self._availability_zone = None self._public_network = None self._volume_count = 1 self._volume_size = 1 self._volume_type = None - self._cached_stack_id = None - self._last_snaps_check_time = None self._slave_addresses = [] - self._thread_pool = worker_pool(20) self._filename = None self._deadline = None self._steady_state_samples = 10 - self._queue_depths = [1, 4, 8] - self._block_sizes = [512, 4096, 16384] + self._queue_depths = "1" + self._block_sizes = "4096" self._workload_modules = [] self._custom_workloads = [] + self._subnet_CIDR = '172.16.0.0/16' self.slave_info = {} + self.stackless = False + self.mkfs = None + self.mount_device = None + self._last_snaps_check_time = None + self._cached_stack_id = None @property def volume_count(self): @@ -119,6 +131,32 @@ class StorPerfMaster(object): self._volume_type = value @property + def stack_name(self): + return self._stack_name + + @stack_name.setter + def stack_name(self, value): + if value is None: + self.stackless = True + else: + self.stackless = False + self._stack_name = value + self.stack_settings.name = self.stack_name + self.stack_id = None + self._last_snaps_check_time = None + + @property + def subnet_CIDR(self): + return self._subnet_CIDR + + @subnet_CIDR.setter + def subnet_CIDR(self, value): + if (self.stack_id is not None): + raise ParameterError( + "ERROR: Cannot change subnet CIDR after stack is created") + self._subnet_CIDR = value + + @property def agent_count(self): self._get_stack_info() return self._agent_count @@ -170,6 +208,10 @@ class StorPerfMaster(object): def slave_addresses(self): return self._slave_addresses + @slave_addresses.setter + def slave_addresses(self, value): + self._slave_addresses = value + @property def stack_id(self): self._get_stack_info() @@ -180,6 +222,10 @@ class StorPerfMaster(object): self._cached_stack_id = value def _get_stack_info(self): + if self.stackless: + self._cached_stack_id = None + return None + if self._last_snaps_check_time is not None: time_since_check = datetime.now() - self._last_snaps_check_time if time_since_check.total_seconds() < 60: @@ -192,7 +238,7 @@ class StorPerfMaster(object): cinder_cli = cinder_utils.cinder_client(self.os_creds) glance_cli = glance_utils.glance_client(self.os_creds) - router_worker = self._thread_pool.apply_async( + router_worker = self._snaps_pool.apply_async( self.heat_stack.get_router_creators) vm_inst_creators = self.heat_stack.get_vm_inst_creators() @@ -210,7 +256,7 @@ class StorPerfMaster(object): server = vm1.get_vm_inst() - image_worker = self._thread_pool.apply_async( + image_worker = self._snaps_pool.apply_async( glance_utils.get_image_by_id, (glance_cli, server.image_id)) self._volume_count = len(server.volume_ids) @@ -316,6 +362,19 @@ class StorPerfMaster(object): self._custom_workloads = value @property + def ssh_key(self): + if self._ssh_key is None: + return None + key = StringIO(self._ssh_key) + pkey = paramiko.RSAKey.from_private_key(key) + key.close() + return pkey + + @ssh_key.setter + def ssh_key(self, value): + self._ssh_key = value + + @property def is_stack_created(self): return (self.stack_id is not None and (self.heat_stack.get_status() == u'CREATE_COMPLETE' or @@ -339,6 +398,8 @@ class StorPerfMaster(object): return logs def create_stack(self): + self.stackless = False + self.stack_settings.resource_files = [ 'storperf/resources/hot/storperf-agent.yaml', 'storperf/resources/hot/storperf-volume.yaml'] @@ -398,8 +459,10 @@ class StorPerfMaster(object): raise Exception("ERROR: Job {} is already running".format( self._test_executor.job_id)) - if (self.stack_id is None): - raise ParameterError("ERROR: Stack does not exist") + if (not self.stackless and + self.stack_id is None): + raise ParameterError("ERROR: Stack %s does not exist" % + self.stack_name) self._test_executor = TestExecutor() self._test_executor.register(self.executor_event) @@ -413,30 +476,43 @@ class StorPerfMaster(object): slaves = self._slave_addresses - setup_threads = [] + setup_pool = ThreadPool(processes=len(slaves)) + workers = [] for slave in slaves: - t = Thread(target=self._setup_slave, args=(slave,)) - setup_threads.append(t) - t.start() + worker = setup_pool.apply_async( + self._setup_slave, (slave,)) + workers.append(worker) - for thread in setup_threads: - thread.join() + for worker in workers: + worker.get() + + setup_pool.close() self._test_executor.slaves = slaves self._test_executor.volume_count = self.volume_count - params = metadata - params['agent_count'] = self.agent_count + params['agent_count'] = len(slaves) + params['agent_flavor'] = self.agent_flavor + params['agent_image'] = self.agent_image + params['agent_info'] = json.dumps(self.slave_info) + params['avaiability_zone'] = self.availability_zone + params['block_sizes'] = self.block_sizes + params['deadline'] = self.deadline params['public_network'] = self.public_network + params['stack_name'] = self.stack_name + params['steady_state_samples'] = self.steady_state_samples + params['subnet_CIDR'] = self.subnet_CIDR + params['target'] = self.filename params['volume_count'] = self.volume_count params['volume_size'] = self.volume_size - params['agent_info'] = json.dumps(self.slave_info) - if self.volume_type is not None: - params['volume_type'] = self.volume_type - if self.username and self.password: + params['volume_type'] = self.volume_type + if self.username: params['username'] = self.username + if self.password: params['password'] = self.password + if self.ssh_key: + params['ssh_key'] = self.ssh_key job_id = self._test_executor.execute(params) self.slave_info = {} @@ -503,7 +579,8 @@ class StorPerfMaster(object): timer = 10 while not alive: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - result = s.connect_ex((slave, 22)) + host, port = ip_helper.parse_address_and_port(slave) + result = s.connect_ex((host, port)) s.close() if result: @@ -520,13 +597,26 @@ class StorPerfMaster(object): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) if self.username and self.password: - ssh.connect(slave, - username=self.username, - password=self.password) + ssh.connect( + host, + port=port, + username=self.username, + password=self.password, + timeout=2) + elif self.username and self.ssh_key: + ssh.connect( + host, + port=port, + username=self.username, + pkey=self.ssh_key, + timeout=2) else: - ssh.connect(slave, username='storperf', - key_filename='storperf/resources/ssh/storperf_rsa', - timeout=2) + ssh.connect( + slave, + port=port, + username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) uname = self._get_uname(ssh) logger.debug("Slave uname is %s" % uname) @@ -550,6 +640,12 @@ class StorPerfMaster(object): logger.debug("Transferring fio to %s" % slave) scp.put('/usr/local/bin/fio', '~/') + if self.mkfs is not None: + self._mkfs(ssh, logger) + + if self.mount_device is not None: + self._mount(ssh, logger) + def _get_uname(self, ssh): (_, stdout, _) = ssh.exec_command("uname -a") return stdout.readline() @@ -562,6 +658,59 @@ class StorPerfMaster(object): available = lines[3] return int(available) + def _mkfs(self, ssh, logger): + command = "sudo umount %s" % (self.mount_device) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + for line in iter(stderr.readline, b''): + logger.error(line) + + command = "sudo mkfs.%s %s" % (self.mkfs, self.mount_device) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + rc = stdout.channel.recv_exit_status() + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + error_messages = "" + for line in iter(stderr.readline, b''): + logger.error(line) + error_messages += line.rstrip() + + if rc != 0: + raise Exception( + "Error executing on {0}: {1}".format( + command, error_messages)) + + def _mount(self, ssh, logger): + command = "sudo mkdir -p %s" % (self.filename) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + for line in iter(stderr.readline, b''): + logger.error(line) + + command = "sudo mount %s %s" % (self.mount_device, self.filename) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + rc = stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + error_messages = "" + for line in iter(stderr.readline, b''): + logger.error(line) + error_messages += line.rstrip() + + if rc != 0: + raise Exception( + "Could not mount {0}: {1}".format( + self.mount_device, error_messages)) + def _resize_root_fs(self, ssh, logger): command = "sudo /usr/sbin/resize2fs /dev/vda1" logger.info("Attempting %s" % command) @@ -573,11 +722,14 @@ class StorPerfMaster(object): logger.error(line) def _make_parameters(self): + random_str = uuid.uuid4().hex[:6].upper() heat_parameters = {} heat_parameters['public_network'] = self.public_network heat_parameters['agent_count'] = self.agent_count heat_parameters['volume_count'] = self.volume_count heat_parameters['volume_size'] = self.volume_size + heat_parameters['keypair_name'] = 'storperf_agent_keypair' + random_str + heat_parameters['subnet_CIDR'] = self.subnet_CIDR if self.volume_type is not None: heat_parameters['volume_type'] = self.volume_type heat_parameters['agent_image'] = self.agent_image |