summaryrefslogtreecommitdiffstats
path: root/docker/storperf-master/storperf/storperf_master.py
diff options
context:
space:
mode:
authormbeierl <mark.beierl@dell.com>2018-10-24 17:05:37 -0400
committermbeierl <mark.beierl@dell.com>2018-10-29 11:02:00 -0400
commitbf29ec6e9a5f742d71e7d5cafe009b7223f46782 (patch)
treebd9cee239de2c4aa4982c27864807385fd4cd61f /docker/storperf-master/storperf/storperf_master.py
parent518988ca97031ef0e64e15db5ec2d8f2b86d49e9 (diff)
Add Stackless Support
Adds ability to specify IP address list instead of looking up full list from OpenStack Heat. Adds stackless mode to bypass OpenStack heat altogether if we are running against bare metal or other nodes that are not under Heat's domain. Adds ability to create filesystems and mount them for profiling. Adds number of jobs and number of files to create to the initializations API so we can lay down files and fill them with random data ahead of the actual performance run. Change-Id: Ia787f8b863bc92b38dd29b3cf17eda0d48f3bcd5 JIRA: STORPERF-265 Signed-off-by: mbeierl <mark.beierl@dell.com>
Diffstat (limited to 'docker/storperf-master/storperf/storperf_master.py')
-rw-r--r--docker/storperf-master/storperf/storperf_master.py175
1 files changed, 144 insertions, 31 deletions
diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py
index 0c7e559..76c4807 100644
--- a/docker/storperf-master/storperf/storperf_master.py
+++ b/docker/storperf-master/storperf/storperf_master.py
@@ -7,25 +7,26 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+import StringIO
from datetime import datetime
+import json
import logging
+from multiprocessing.pool import ThreadPool
import os
import socket
-from threading import Thread
from time import sleep
+import uuid
import paramiko
from scp import SCPClient
-
from snaps.config.stack import StackConfig
from snaps.openstack.create_stack import OpenStackHeatStack
from snaps.openstack.os_credentials import OSCreds
from snaps.openstack.utils import heat_utils, cinder_utils, glance_utils
from snaps.thread_utils import worker_pool
+
from storperf.db.job_db import JobDB
from storperf.test_executor import TestExecutor
-import json
-import uuid
class ParameterError(Exception):
@@ -37,8 +38,9 @@ class StorPerfMaster(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
+ self.reset_values()
+
self.job_db = JobDB()
- self._stack_name = 'StorPerfAgentGroup'
self.stack_settings = StackConfig(
name=self.stack_name,
template_path='storperf/resources/hot/agent-group.yaml')
@@ -59,21 +61,26 @@ class StorPerfMaster(object):
self.heat_stack = OpenStackHeatStack(self.os_creds,
self.stack_settings)
+
+ self._cached_stack_id = None
+ self._last_snaps_check_time = None
+ self._snaps_pool = worker_pool(20)
+
+ def reset_values(self):
+ self._stack_name = 'StorPerfAgentGroup'
self.username = None
self.password = None
+ self._ssh_key = None
self._test_executor = None
self._agent_count = 1
- self._agent_image = "Ubuntu 14.04"
- self._agent_flavor = "storperf"
+ self._agent_image = None
+ self._agent_flavor = None
self._availability_zone = None
self._public_network = None
self._volume_count = 1
self._volume_size = 1
self._volume_type = None
- self._cached_stack_id = None
- self._last_snaps_check_time = None
self._slave_addresses = []
- self._thread_pool = worker_pool(20)
self._filename = None
self._deadline = None
self._steady_state_samples = 10
@@ -83,6 +90,9 @@ class StorPerfMaster(object):
self._custom_workloads = []
self._subnet_CIDR = '172.16.0.0/16'
self.slave_info = {}
+ self.stackless = False
+ self.mkfs = None
+ self.mount_device = None
@property
def volume_count(self):
@@ -126,10 +136,14 @@ class StorPerfMaster(object):
@stack_name.setter
def stack_name(self, value):
- self._stack_name = value
- self.stack_settings.name = self.stack_name
- self.stack_id = None
- self._last_snaps_check_time = None
+ if value is None:
+ self.stackless = True
+ else:
+ self.stackless = False
+ self._stack_name = value
+ self.stack_settings.name = self.stack_name
+ self.stack_id = None
+ self._last_snaps_check_time = None
@property
def subnet_CIDR(self):
@@ -194,6 +208,10 @@ class StorPerfMaster(object):
def slave_addresses(self):
return self._slave_addresses
+ @slave_addresses.setter
+ def slave_addresses(self, value):
+ self._slave_addresses = value
+
@property
def stack_id(self):
self._get_stack_info()
@@ -204,6 +222,10 @@ class StorPerfMaster(object):
self._cached_stack_id = value
def _get_stack_info(self):
+ if self.stackless:
+ self._cached_stack_id = None
+ return None
+
if self._last_snaps_check_time is not None:
time_since_check = datetime.now() - self._last_snaps_check_time
if time_since_check.total_seconds() < 60:
@@ -216,7 +238,7 @@ class StorPerfMaster(object):
cinder_cli = cinder_utils.cinder_client(self.os_creds)
glance_cli = glance_utils.glance_client(self.os_creds)
- router_worker = self._thread_pool.apply_async(
+ router_worker = self._snaps_pool.apply_async(
self.heat_stack.get_router_creators)
vm_inst_creators = self.heat_stack.get_vm_inst_creators()
@@ -234,7 +256,7 @@ class StorPerfMaster(object):
server = vm1.get_vm_inst()
- image_worker = self._thread_pool.apply_async(
+ image_worker = self._snaps_pool.apply_async(
glance_utils.get_image_by_id, (glance_cli, server.image_id))
self._volume_count = len(server.volume_ids)
@@ -340,6 +362,19 @@ class StorPerfMaster(object):
self._custom_workloads = value
@property
+ def ssh_key(self):
+ if self._ssh_key is None:
+ return None
+ key = StringIO.StringIO(self._ssh_key)
+ pkey = paramiko.RSAKey.from_private_key(key)
+ key.close()
+ return pkey
+
+ @ssh_key.setter
+ def ssh_key(self, value):
+ self._ssh_key = value
+
+ @property
def is_stack_created(self):
return (self.stack_id is not None and
(self.heat_stack.get_status() == u'CREATE_COMPLETE' or
@@ -363,6 +398,8 @@ class StorPerfMaster(object):
return logs
def create_stack(self):
+ self.stackless = False
+
self.stack_settings.resource_files = [
'storperf/resources/hot/storperf-agent.yaml',
'storperf/resources/hot/storperf-volume.yaml']
@@ -422,7 +459,8 @@ class StorPerfMaster(object):
raise Exception("ERROR: Job {} is already running".format(
self._test_executor.job_id))
- if (self.stack_id is None):
+ if (not self.stackless and
+ self.stack_id is None):
raise ParameterError("ERROR: Stack %s does not exist" %
self.stack_name)
@@ -438,20 +476,23 @@ class StorPerfMaster(object):
slaves = self._slave_addresses
- setup_threads = []
+ setup_pool = ThreadPool(processes=len(slaves))
+ workers = []
for slave in slaves:
- t = Thread(target=self._setup_slave, args=(slave,))
- setup_threads.append(t)
- t.start()
+ worker = setup_pool.apply_async(
+ self._setup_slave, (slave,))
+ workers.append(worker)
- for thread in setup_threads:
- thread.join()
+ for worker in workers:
+ worker.get()
+
+ setup_pool.close()
self._test_executor.slaves = slaves
self._test_executor.volume_count = self.volume_count
params = metadata
- params['agent_count'] = self.agent_count
+ params['agent_count'] = len(slaves)
params['agent_flavor'] = self.agent_flavor
params['agent_image'] = self.agent_image
params['agent_info'] = json.dumps(self.slave_info)
@@ -466,9 +507,12 @@ class StorPerfMaster(object):
params['volume_count'] = self.volume_count
params['volume_size'] = self.volume_size
params['volume_type'] = self.volume_type
- if self.username and self.password:
+ if self.username:
params['username'] = self.username
+ if self.password:
params['password'] = self.password
+ if self.ssh_key:
+ params['ssh_key'] = self.ssh_key
job_id = self._test_executor.execute(params)
self.slave_info = {}
@@ -552,13 +596,23 @@ class StorPerfMaster(object):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.username and self.password:
- ssh.connect(slave,
- username=self.username,
- password=self.password)
+ ssh.connect(
+ slave,
+ username=self.username,
+ password=self.password,
+ timeout=2)
+ elif self.username and self.ssh_key:
+ ssh.connect(
+ slave,
+ username=self.username,
+ pkey=self.ssh_key,
+ timeout=2)
else:
- ssh.connect(slave, username='storperf',
- key_filename='storperf/resources/ssh/storperf_rsa',
- timeout=2)
+ ssh.connect(
+ slave,
+ username='storperf',
+ key_filename='storperf/resources/ssh/storperf_rsa',
+ timeout=2)
uname = self._get_uname(ssh)
logger.debug("Slave uname is %s" % uname)
@@ -582,6 +636,12 @@ class StorPerfMaster(object):
logger.debug("Transferring fio to %s" % slave)
scp.put('/usr/local/bin/fio', '~/')
+ if self.mkfs is not None:
+ self._mkfs(ssh, logger)
+
+ if self.mount_device is not None:
+ self._mount(ssh, logger)
+
def _get_uname(self, ssh):
(_, stdout, _) = ssh.exec_command("uname -a")
return stdout.readline()
@@ -594,6 +654,59 @@ class StorPerfMaster(object):
available = lines[3]
return int(available)
+ def _mkfs(self, ssh, logger):
+ command = "sudo umount %s" % (self.mount_device)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+
+ command = "sudo mkfs.%s %s" % (self.mkfs, self.mount_device)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ rc = stdout.channel.recv_exit_status()
+ stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ error_messages = ""
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+ error_messages += line.rstrip()
+
+ if rc != 0:
+ raise Exception(
+ "Error executing on {0}: {1}".format(
+ command, error_messages))
+
+ def _mount(self, ssh, logger):
+ command = "sudo mkdir -p %s" % (self.filename)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+
+ command = "sudo mount %s %s" % (self.mount_device, self.filename)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ rc = stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ error_messages = ""
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+ error_messages += line.rstrip()
+
+ if rc != 0:
+ raise Exception(
+ "Could not mount {0}: {1}".format(
+ self.mount_device, error_messages))
+
def _resize_root_fs(self, ssh, logger):
command = "sudo /usr/sbin/resize2fs /dev/vda1"
logger.info("Attempting %s" % command)