summaryrefslogtreecommitdiffstats
path: root/docker/storperf-master/storperf/storperf_master.py
diff options
context:
space:
mode:
Diffstat (limited to 'docker/storperf-master/storperf/storperf_master.py')
-rw-r--r--docker/storperf-master/storperf/storperf_master.py232
1 files changed, 192 insertions, 40 deletions
diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py
index b17dae9..73f8f0d 100644
--- a/docker/storperf-master/storperf/storperf_master.py
+++ b/docker/storperf-master/storperf/storperf_master.py
@@ -7,16 +7,11 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from datetime import datetime
-import logging
-import os
-import socket
-from threading import Thread
-from time import sleep
-import paramiko
+from datetime import datetime
+from io import StringIO
+from multiprocessing.pool import ThreadPool
from scp import SCPClient
-
from snaps.config.stack import StackConfig
from snaps.openstack.create_stack import OpenStackHeatStack
from snaps.openstack.os_credentials import OSCreds
@@ -24,7 +19,14 @@ from snaps.openstack.utils import heat_utils, cinder_utils, glance_utils
from snaps.thread_utils import worker_pool
from storperf.db.job_db import JobDB
from storperf.test_executor import TestExecutor
+from storperf.utilities import ip_helper
+from time import sleep
import json
+import logging
+import os
+import paramiko
+import socket
+import uuid
class ParameterError(Exception):
@@ -36,10 +38,11 @@ class StorPerfMaster(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
- self.job_db = JobDB()
+ self.reset_values()
+ self.job_db = JobDB()
self.stack_settings = StackConfig(
- name='StorPerfAgentGroup',
+ name=self.stack_name,
template_path='storperf/resources/hot/agent-group.yaml')
self.os_creds = OSCreds(
@@ -58,29 +61,38 @@ class StorPerfMaster(object):
self.heat_stack = OpenStackHeatStack(self.os_creds,
self.stack_settings)
+
+ self._snaps_pool = worker_pool(20)
+
+ def reset_values(self):
+ self._stack_name = 'StorPerfAgentGroup'
self.username = None
self.password = None
+ self._ssh_key = None
self._test_executor = None
self._agent_count = 1
- self._agent_image = "Ubuntu 14.04"
- self._agent_flavor = "storperf"
+ self._agent_image = None
+ self._agent_flavor = None
self._availability_zone = None
self._public_network = None
self._volume_count = 1
self._volume_size = 1
self._volume_type = None
- self._cached_stack_id = None
- self._last_snaps_check_time = None
self._slave_addresses = []
- self._thread_pool = worker_pool(20)
self._filename = None
self._deadline = None
self._steady_state_samples = 10
- self._queue_depths = [1, 4, 8]
- self._block_sizes = [512, 4096, 16384]
+ self._queue_depths = "1"
+ self._block_sizes = "4096"
self._workload_modules = []
self._custom_workloads = []
+ self._subnet_CIDR = '172.16.0.0/16'
self.slave_info = {}
+ self.stackless = False
+ self.mkfs = None
+ self.mount_device = None
+ self._last_snaps_check_time = None
+ self._cached_stack_id = None
@property
def volume_count(self):
@@ -119,6 +131,32 @@ class StorPerfMaster(object):
self._volume_type = value
@property
+ def stack_name(self):
+ return self._stack_name
+
+ @stack_name.setter
+ def stack_name(self, value):
+ if value is None:
+ self.stackless = True
+ else:
+ self.stackless = False
+ self._stack_name = value
+ self.stack_settings.name = self.stack_name
+ self.stack_id = None
+ self._last_snaps_check_time = None
+
+ @property
+ def subnet_CIDR(self):
+ return self._subnet_CIDR
+
+ @subnet_CIDR.setter
+ def subnet_CIDR(self, value):
+ if (self.stack_id is not None):
+ raise ParameterError(
+ "ERROR: Cannot change subnet CIDR after stack is created")
+ self._subnet_CIDR = value
+
+ @property
def agent_count(self):
self._get_stack_info()
return self._agent_count
@@ -170,6 +208,10 @@ class StorPerfMaster(object):
def slave_addresses(self):
return self._slave_addresses
+ @slave_addresses.setter
+ def slave_addresses(self, value):
+ self._slave_addresses = value
+
@property
def stack_id(self):
self._get_stack_info()
@@ -180,6 +222,10 @@ class StorPerfMaster(object):
self._cached_stack_id = value
def _get_stack_info(self):
+ if self.stackless:
+ self._cached_stack_id = None
+ return None
+
if self._last_snaps_check_time is not None:
time_since_check = datetime.now() - self._last_snaps_check_time
if time_since_check.total_seconds() < 60:
@@ -192,7 +238,7 @@ class StorPerfMaster(object):
cinder_cli = cinder_utils.cinder_client(self.os_creds)
glance_cli = glance_utils.glance_client(self.os_creds)
- router_worker = self._thread_pool.apply_async(
+ router_worker = self._snaps_pool.apply_async(
self.heat_stack.get_router_creators)
vm_inst_creators = self.heat_stack.get_vm_inst_creators()
@@ -210,7 +256,7 @@ class StorPerfMaster(object):
server = vm1.get_vm_inst()
- image_worker = self._thread_pool.apply_async(
+ image_worker = self._snaps_pool.apply_async(
glance_utils.get_image_by_id, (glance_cli, server.image_id))
self._volume_count = len(server.volume_ids)
@@ -316,6 +362,19 @@ class StorPerfMaster(object):
self._custom_workloads = value
@property
+ def ssh_key(self):
+ if self._ssh_key is None:
+ return None
+ key = StringIO(self._ssh_key)
+ pkey = paramiko.RSAKey.from_private_key(key)
+ key.close()
+ return pkey
+
+ @ssh_key.setter
+ def ssh_key(self, value):
+ self._ssh_key = value
+
+ @property
def is_stack_created(self):
return (self.stack_id is not None and
(self.heat_stack.get_status() == u'CREATE_COMPLETE' or
@@ -339,6 +398,8 @@ class StorPerfMaster(object):
return logs
def create_stack(self):
+ self.stackless = False
+
self.stack_settings.resource_files = [
'storperf/resources/hot/storperf-agent.yaml',
'storperf/resources/hot/storperf-volume.yaml']
@@ -398,8 +459,10 @@ class StorPerfMaster(object):
raise Exception("ERROR: Job {} is already running".format(
self._test_executor.job_id))
- if (self.stack_id is None):
- raise ParameterError("ERROR: Stack does not exist")
+ if (not self.stackless and
+ self.stack_id is None):
+ raise ParameterError("ERROR: Stack %s does not exist" %
+ self.stack_name)
self._test_executor = TestExecutor()
self._test_executor.register(self.executor_event)
@@ -413,30 +476,43 @@ class StorPerfMaster(object):
slaves = self._slave_addresses
- setup_threads = []
+ setup_pool = ThreadPool(processes=len(slaves))
+ workers = []
for slave in slaves:
- t = Thread(target=self._setup_slave, args=(slave,))
- setup_threads.append(t)
- t.start()
+ worker = setup_pool.apply_async(
+ self._setup_slave, (slave,))
+ workers.append(worker)
- for thread in setup_threads:
- thread.join()
+ for worker in workers:
+ worker.get()
+
+ setup_pool.close()
self._test_executor.slaves = slaves
self._test_executor.volume_count = self.volume_count
-
params = metadata
- params['agent_count'] = self.agent_count
+ params['agent_count'] = len(slaves)
+ params['agent_flavor'] = self.agent_flavor
+ params['agent_image'] = self.agent_image
+ params['agent_info'] = json.dumps(self.slave_info)
+ params['avaiability_zone'] = self.availability_zone
+ params['block_sizes'] = self.block_sizes
+ params['deadline'] = self.deadline
params['public_network'] = self.public_network
+ params['stack_name'] = self.stack_name
+ params['steady_state_samples'] = self.steady_state_samples
+ params['subnet_CIDR'] = self.subnet_CIDR
+ params['target'] = self.filename
params['volume_count'] = self.volume_count
params['volume_size'] = self.volume_size
- params['agent_info'] = json.dumps(self.slave_info)
- if self.volume_type is not None:
- params['volume_type'] = self.volume_type
- if self.username and self.password:
+ params['volume_type'] = self.volume_type
+ if self.username:
params['username'] = self.username
+ if self.password:
params['password'] = self.password
+ if self.ssh_key:
+ params['ssh_key'] = self.ssh_key
job_id = self._test_executor.execute(params)
self.slave_info = {}
@@ -503,7 +579,8 @@ class StorPerfMaster(object):
timer = 10
while not alive:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- result = s.connect_ex((slave, 22))
+ host, port = ip_helper.parse_address_and_port(slave)
+ result = s.connect_ex((host, port))
s.close()
if result:
@@ -520,13 +597,26 @@ class StorPerfMaster(object):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.username and self.password:
- ssh.connect(slave,
- username=self.username,
- password=self.password)
+ ssh.connect(
+ host,
+ port=port,
+ username=self.username,
+ password=self.password,
+ timeout=2)
+ elif self.username and self.ssh_key:
+ ssh.connect(
+ host,
+ port=port,
+ username=self.username,
+ pkey=self.ssh_key,
+ timeout=2)
else:
- ssh.connect(slave, username='storperf',
- key_filename='storperf/resources/ssh/storperf_rsa',
- timeout=2)
+ ssh.connect(
+ slave,
+ port=port,
+ username='storperf',
+ key_filename='storperf/resources/ssh/storperf_rsa',
+ timeout=2)
uname = self._get_uname(ssh)
logger.debug("Slave uname is %s" % uname)
@@ -550,6 +640,12 @@ class StorPerfMaster(object):
logger.debug("Transferring fio to %s" % slave)
scp.put('/usr/local/bin/fio', '~/')
+ if self.mkfs is not None:
+ self._mkfs(ssh, logger)
+
+ if self.mount_device is not None:
+ self._mount(ssh, logger)
+
def _get_uname(self, ssh):
(_, stdout, _) = ssh.exec_command("uname -a")
return stdout.readline()
@@ -562,6 +658,59 @@ class StorPerfMaster(object):
available = lines[3]
return int(available)
+ def _mkfs(self, ssh, logger):
+ command = "sudo umount %s" % (self.mount_device)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+
+ command = "sudo mkfs.%s %s" % (self.mkfs, self.mount_device)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ rc = stdout.channel.recv_exit_status()
+ stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ error_messages = ""
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+ error_messages += line.rstrip()
+
+ if rc != 0:
+ raise Exception(
+ "Error executing on {0}: {1}".format(
+ command, error_messages))
+
+ def _mount(self, ssh, logger):
+ command = "sudo mkdir -p %s" % (self.filename)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+
+ command = "sudo mount %s %s" % (self.mount_device, self.filename)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ rc = stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ error_messages = ""
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+ error_messages += line.rstrip()
+
+ if rc != 0:
+ raise Exception(
+ "Could not mount {0}: {1}".format(
+ self.mount_device, error_messages))
+
def _resize_root_fs(self, ssh, logger):
command = "sudo /usr/sbin/resize2fs /dev/vda1"
logger.info("Attempting %s" % command)
@@ -573,11 +722,14 @@ class StorPerfMaster(object):
logger.error(line)
def _make_parameters(self):
+ random_str = uuid.uuid4().hex[:6].upper()
heat_parameters = {}
heat_parameters['public_network'] = self.public_network
heat_parameters['agent_count'] = self.agent_count
heat_parameters['volume_count'] = self.volume_count
heat_parameters['volume_size'] = self.volume_size
+ heat_parameters['keypair_name'] = 'storperf_agent_keypair' + random_str
+ heat_parameters['subnet_CIDR'] = self.subnet_CIDR
if self.volume_type is not None:
heat_parameters['volume_type'] = self.volume_type
heat_parameters['agent_image'] = self.agent_image