summaryrefslogtreecommitdiffstats
path: root/docker/storperf-master/storperf
diff options
context:
space:
mode:
Diffstat (limited to 'docker/storperf-master/storperf')
-rw-r--r--docker/storperf-master/storperf/carbon/converter.py10
-rw-r--r--docker/storperf-master/storperf/carbon/emitter.py6
-rw-r--r--docker/storperf-master/storperf/db/graphite_db.py2
-rw-r--r--docker/storperf-master/storperf/db/job_db.py4
-rw-r--r--docker/storperf-master/storperf/fio/fio_invoker.py36
-rw-r--r--docker/storperf-master/storperf/resources/hot/agent-group.yaml15
-rw-r--r--docker/storperf-master/storperf/resources/hot/storperf-agent.yaml2
-rw-r--r--docker/storperf-master/storperf/resources/hot/storperf-volume.yaml2
-rw-r--r--docker/storperf-master/storperf/storperf_master.py232
-rw-r--r--docker/storperf-master/storperf/test_executor.py29
-rw-r--r--docker/storperf-master/storperf/utilities/data_handler.py10
-rw-r--r--docker/storperf-master/storperf/utilities/ip_helper.py27
-rw-r--r--docker/storperf-master/storperf/utilities/math.py67
-rw-r--r--docker/storperf-master/storperf/utilities/steady_state.py15
-rw-r--r--docker/storperf-master/storperf/workloads/_base_workload.py19
-rw-r--r--docker/storperf-master/storperf/workloads/_custom_workload.py2
16 files changed, 387 insertions, 91 deletions
diff --git a/docker/storperf-master/storperf/carbon/converter.py b/docker/storperf-master/storperf/carbon/converter.py
index 623c144..4b5e6aa 100644
--- a/docker/storperf-master/storperf/carbon/converter.py
+++ b/docker/storperf-master/storperf/carbon/converter.py
@@ -32,12 +32,12 @@ class Converter(object):
def resurse_to_flat_dictionary(self, json, prefix=None):
if type(json) == dict:
- for k, v in json.items():
+ for k, v in list(json.items()):
if prefix is None:
- key = k.decode("utf-8").replace(" ", "_")
+ key = k.replace(" ", "_")
else:
- key = prefix + "." + k.decode("utf-8").replace(" ", "_")
- if hasattr(v, '__iter__'):
+ key = prefix + "." + k.replace(" ", "_")
+ if type(v) is list or type(v) is dict:
self.resurse_to_flat_dictionary(v, key)
else:
self.flat_dictionary[key] = str(v).replace(" ", "_")
@@ -45,7 +45,7 @@ class Converter(object):
index = 0
for v in json:
index += 1
- if hasattr(v, '__iter__'):
+ if type(v) is list or type(v) is dict:
self.resurse_to_flat_dictionary(
v, prefix + "." + str(index))
else:
diff --git a/docker/storperf-master/storperf/carbon/emitter.py b/docker/storperf-master/storperf/carbon/emitter.py
index b196709..13503b2 100644
--- a/docker/storperf-master/storperf/carbon/emitter.py
+++ b/docker/storperf-master/storperf/carbon/emitter.py
@@ -40,19 +40,19 @@ class CarbonMetricTransmitter():
message = "%s %s %s\n" \
% (key, value, timestamp)
self.logger.debug("Metric: " + message.strip())
- carbon_socket.send(message)
+ carbon_socket.send(message.encode('utf-8'))
except ValueError:
self.logger.debug("Ignoring non numeric metric %s %s"
% (key, value))
message = "%s.commit-marker %s %s\n" \
% (commit_marker, timestamp, timestamp)
- carbon_socket.send(message)
+ carbon_socket.send(message.encode('utf-8'))
self.logger.debug("Marker %s" % message.strip())
self.logger.info("Sent metrics to %s:%s with timestamp %s"
% (self.host, self.port, timestamp))
- except Exception, e:
+ except Exception as e:
self.logger.error("While notifying carbon %s:%s %s"
% (self.host, self.port, e))
diff --git a/docker/storperf-master/storperf/db/graphite_db.py b/docker/storperf-master/storperf/db/graphite_db.py
index 8ebd22e..59b9f5d 100644
--- a/docker/storperf-master/storperf/db/graphite_db.py
+++ b/docker/storperf-master/storperf/db/graphite_db.py
@@ -41,7 +41,7 @@ class GraphiteDB(object):
start = end - duration
request = ("http://%s:%s/graphite/render/?target="
- "%s(%s.*.jobs.1.%s.%s)"
+ "%s(%s.*.jobs.*.%s.%s)"
"&format=json"
"&from=%s"
"&until=%s"
diff --git a/docker/storperf-master/storperf/db/job_db.py b/docker/storperf-master/storperf/db/job_db.py
index eb35cac..c3632e4 100644
--- a/docker/storperf-master/storperf/db/job_db.py
+++ b/docker/storperf-master/storperf/db/job_db.py
@@ -220,7 +220,7 @@ class JobDB(object):
db = sqlite3.connect(JobDB.db_name)
cursor = db.cursor()
- for param, value in params.iteritems():
+ for param, value in params.items():
cursor.execute(
"""insert into job_params
(job_id,
@@ -265,7 +265,7 @@ class JobDB(object):
break
try:
data = json.loads(row[1])
- except:
+ except Exception:
data = row[1]
params[row[0]] = data
db.close()
diff --git a/docker/storperf-master/storperf/fio/fio_invoker.py b/docker/storperf-master/storperf/fio/fio_invoker.py
index a361eec..bb81eef 100644
--- a/docker/storperf-master/storperf/fio/fio_invoker.py
+++ b/docker/storperf-master/storperf/fio/fio_invoker.py
@@ -11,6 +11,7 @@ import json
import logging
from threading import Thread
import paramiko
+from storperf.utilities import ip_helper
class FIOInvoker(object):
@@ -45,6 +46,8 @@ class FIOInvoker(object):
self.json_body = ""
try:
for line in iter(stdout.readline, b''):
+ if type(line) == bytes:
+ line = line.decode('utf=8')
if line.startswith("fio"):
line = ""
continue
@@ -78,7 +81,8 @@ class FIOInvoker(object):
def stderr_handler(self, stderr):
self.logger.debug("Started")
for line in iter(stderr.readline, b''):
- self.logger.error("FIO Error: %s", line.rstrip())
+ if len(line) > 0:
+ self.logger.error("FIO Error: %s", line.rstrip())
self.stderr.append(line.rstrip())
# Sometime, FIO gets stuck and will give us this message:
@@ -137,10 +141,12 @@ class FIOInvoker(object):
ssh = self._ssh_client()
- command = "sudo killall fio"
-
- self.logger.debug("Executing on %s: %s" % (self.remote_host, command))
- (_, stdout, stderr) = ssh.exec_command(command)
+ kill_commands = ['sudo killall fio',
+ 'sudo pkill fio']
+ for command in kill_commands:
+ self.logger.debug("Executing on %s: %s" %
+ (self.remote_host, command))
+ (_, stdout, stderr) = ssh.exec_command(command)
for line in stdout.readlines():
self.logger.debug(line.strip())
@@ -153,13 +159,25 @@ class FIOInvoker(object):
def _ssh_client(self):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ address, port = ip_helper.parse_address_and_port(self.remote_host)
if 'username' in self.metadata and 'password' in self.metadata:
- ssh.connect(self.remote_host,
+ ssh.connect(address,
+ port=port,
+ username=self.metadata['username'],
+ password=self.metadata['password'],
+ timeout=5)
+ return ssh
+ elif 'username' in self.metadata and 'ssh_key' in self.metadata:
+ ssh.connect(address,
+ port=port,
username=self.metadata['username'],
- password=self.metadata['password'])
+ pkey=self.metadata['ssh_key'],
+ timeout=5)
return ssh
else:
- ssh.connect(self.remote_host, username='storperf',
+ ssh.connect(address,
+ port=port,
+ username='storperf',
key_filename='storperf/resources/ssh/storperf_rsa',
- timeout=2)
+ timeout=5)
return ssh
diff --git a/docker/storperf-master/storperf/resources/hot/agent-group.yaml b/docker/storperf-master/storperf/resources/hot/agent-group.yaml
index 4e79d81..f09d95a 100644
--- a/docker/storperf-master/storperf/resources/hot/agent-group.yaml
+++ b/docker/storperf-master/storperf/resources/hot/agent-group.yaml
@@ -7,7 +7,7 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-heat_template_version: 2017-09-01
+heat_template_version: newton
parameters:
public_network:
@@ -45,6 +45,12 @@ parameters:
availability_zone:
type: string
default: nova
+ subnet_CIDR:
+ type: string
+ default: '172.16.0.0/16'
+ keypair_name:
+ type: string
+ default: storperf_agent_keypair
resources:
slaves:
@@ -71,15 +77,12 @@ resources:
storperf_network:
type: OS::Neutron::Net
- properties:
- name: storperf-network
storperf_subnet:
type: OS::Neutron::Subnet
properties:
network_id: { get_resource: storperf_network }
- cidr: 172.16.0.0/16
- gateway_ip: 172.16.0.1
+ cidr: { get_param: subnet_CIDR}
storperf_network_router:
type: OS::Neutron::Router
@@ -97,7 +100,7 @@ resources:
type: OS::Nova::KeyPair
properties:
save_private_key: true
- name: storperf_agent_keypair
+ name: { get_param: keypair_name}
storperf_open_security_group:
type: OS::Neutron::SecurityGroup
diff --git a/docker/storperf-master/storperf/resources/hot/storperf-agent.yaml b/docker/storperf-master/storperf/resources/hot/storperf-agent.yaml
index 6314514..7a0a9e9 100644
--- a/docker/storperf-master/storperf/resources/hot/storperf-agent.yaml
+++ b/docker/storperf-master/storperf/resources/hot/storperf-agent.yaml
@@ -7,7 +7,7 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-heat_template_version: 2017-09-01
+heat_template_version: newton
parameters:
flavor:
diff --git a/docker/storperf-master/storperf/resources/hot/storperf-volume.yaml b/docker/storperf-master/storperf/resources/hot/storperf-volume.yaml
index cbdd861..d64d0c2 100644
--- a/docker/storperf-master/storperf/resources/hot/storperf-volume.yaml
+++ b/docker/storperf-master/storperf/resources/hot/storperf-volume.yaml
@@ -7,7 +7,7 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-heat_template_version: 2017-09-01
+heat_template_version: newton
parameters:
volume_size:
diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py
index b17dae9..73f8f0d 100644
--- a/docker/storperf-master/storperf/storperf_master.py
+++ b/docker/storperf-master/storperf/storperf_master.py
@@ -7,16 +7,11 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from datetime import datetime
-import logging
-import os
-import socket
-from threading import Thread
-from time import sleep
-import paramiko
+from datetime import datetime
+from io import StringIO
+from multiprocessing.pool import ThreadPool
from scp import SCPClient
-
from snaps.config.stack import StackConfig
from snaps.openstack.create_stack import OpenStackHeatStack
from snaps.openstack.os_credentials import OSCreds
@@ -24,7 +19,14 @@ from snaps.openstack.utils import heat_utils, cinder_utils, glance_utils
from snaps.thread_utils import worker_pool
from storperf.db.job_db import JobDB
from storperf.test_executor import TestExecutor
+from storperf.utilities import ip_helper
+from time import sleep
import json
+import logging
+import os
+import paramiko
+import socket
+import uuid
class ParameterError(Exception):
@@ -36,10 +38,11 @@ class StorPerfMaster(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
- self.job_db = JobDB()
+ self.reset_values()
+ self.job_db = JobDB()
self.stack_settings = StackConfig(
- name='StorPerfAgentGroup',
+ name=self.stack_name,
template_path='storperf/resources/hot/agent-group.yaml')
self.os_creds = OSCreds(
@@ -58,29 +61,38 @@ class StorPerfMaster(object):
self.heat_stack = OpenStackHeatStack(self.os_creds,
self.stack_settings)
+
+ self._snaps_pool = worker_pool(20)
+
+ def reset_values(self):
+ self._stack_name = 'StorPerfAgentGroup'
self.username = None
self.password = None
+ self._ssh_key = None
self._test_executor = None
self._agent_count = 1
- self._agent_image = "Ubuntu 14.04"
- self._agent_flavor = "storperf"
+ self._agent_image = None
+ self._agent_flavor = None
self._availability_zone = None
self._public_network = None
self._volume_count = 1
self._volume_size = 1
self._volume_type = None
- self._cached_stack_id = None
- self._last_snaps_check_time = None
self._slave_addresses = []
- self._thread_pool = worker_pool(20)
self._filename = None
self._deadline = None
self._steady_state_samples = 10
- self._queue_depths = [1, 4, 8]
- self._block_sizes = [512, 4096, 16384]
+ self._queue_depths = "1"
+ self._block_sizes = "4096"
self._workload_modules = []
self._custom_workloads = []
+ self._subnet_CIDR = '172.16.0.0/16'
self.slave_info = {}
+ self.stackless = False
+ self.mkfs = None
+ self.mount_device = None
+ self._last_snaps_check_time = None
+ self._cached_stack_id = None
@property
def volume_count(self):
@@ -119,6 +131,32 @@ class StorPerfMaster(object):
self._volume_type = value
@property
+ def stack_name(self):
+ return self._stack_name
+
+ @stack_name.setter
+ def stack_name(self, value):
+ if value is None:
+ self.stackless = True
+ else:
+ self.stackless = False
+ self._stack_name = value
+ self.stack_settings.name = self.stack_name
+ self.stack_id = None
+ self._last_snaps_check_time = None
+
+ @property
+ def subnet_CIDR(self):
+ return self._subnet_CIDR
+
+ @subnet_CIDR.setter
+ def subnet_CIDR(self, value):
+ if (self.stack_id is not None):
+ raise ParameterError(
+ "ERROR: Cannot change subnet CIDR after stack is created")
+ self._subnet_CIDR = value
+
+ @property
def agent_count(self):
self._get_stack_info()
return self._agent_count
@@ -170,6 +208,10 @@ class StorPerfMaster(object):
def slave_addresses(self):
return self._slave_addresses
+ @slave_addresses.setter
+ def slave_addresses(self, value):
+ self._slave_addresses = value
+
@property
def stack_id(self):
self._get_stack_info()
@@ -180,6 +222,10 @@ class StorPerfMaster(object):
self._cached_stack_id = value
def _get_stack_info(self):
+ if self.stackless:
+ self._cached_stack_id = None
+ return None
+
if self._last_snaps_check_time is not None:
time_since_check = datetime.now() - self._last_snaps_check_time
if time_since_check.total_seconds() < 60:
@@ -192,7 +238,7 @@ class StorPerfMaster(object):
cinder_cli = cinder_utils.cinder_client(self.os_creds)
glance_cli = glance_utils.glance_client(self.os_creds)
- router_worker = self._thread_pool.apply_async(
+ router_worker = self._snaps_pool.apply_async(
self.heat_stack.get_router_creators)
vm_inst_creators = self.heat_stack.get_vm_inst_creators()
@@ -210,7 +256,7 @@ class StorPerfMaster(object):
server = vm1.get_vm_inst()
- image_worker = self._thread_pool.apply_async(
+ image_worker = self._snaps_pool.apply_async(
glance_utils.get_image_by_id, (glance_cli, server.image_id))
self._volume_count = len(server.volume_ids)
@@ -316,6 +362,19 @@ class StorPerfMaster(object):
self._custom_workloads = value
@property
+ def ssh_key(self):
+ if self._ssh_key is None:
+ return None
+ key = StringIO(self._ssh_key)
+ pkey = paramiko.RSAKey.from_private_key(key)
+ key.close()
+ return pkey
+
+ @ssh_key.setter
+ def ssh_key(self, value):
+ self._ssh_key = value
+
+ @property
def is_stack_created(self):
return (self.stack_id is not None and
(self.heat_stack.get_status() == u'CREATE_COMPLETE' or
@@ -339,6 +398,8 @@ class StorPerfMaster(object):
return logs
def create_stack(self):
+ self.stackless = False
+
self.stack_settings.resource_files = [
'storperf/resources/hot/storperf-agent.yaml',
'storperf/resources/hot/storperf-volume.yaml']
@@ -398,8 +459,10 @@ class StorPerfMaster(object):
raise Exception("ERROR: Job {} is already running".format(
self._test_executor.job_id))
- if (self.stack_id is None):
- raise ParameterError("ERROR: Stack does not exist")
+ if (not self.stackless and
+ self.stack_id is None):
+ raise ParameterError("ERROR: Stack %s does not exist" %
+ self.stack_name)
self._test_executor = TestExecutor()
self._test_executor.register(self.executor_event)
@@ -413,30 +476,43 @@ class StorPerfMaster(object):
slaves = self._slave_addresses
- setup_threads = []
+ setup_pool = ThreadPool(processes=len(slaves))
+ workers = []
for slave in slaves:
- t = Thread(target=self._setup_slave, args=(slave,))
- setup_threads.append(t)
- t.start()
+ worker = setup_pool.apply_async(
+ self._setup_slave, (slave,))
+ workers.append(worker)
- for thread in setup_threads:
- thread.join()
+ for worker in workers:
+ worker.get()
+
+ setup_pool.close()
self._test_executor.slaves = slaves
self._test_executor.volume_count = self.volume_count
-
params = metadata
- params['agent_count'] = self.agent_count
+ params['agent_count'] = len(slaves)
+ params['agent_flavor'] = self.agent_flavor
+ params['agent_image'] = self.agent_image
+ params['agent_info'] = json.dumps(self.slave_info)
+ params['avaiability_zone'] = self.availability_zone
+ params['block_sizes'] = self.block_sizes
+ params['deadline'] = self.deadline
params['public_network'] = self.public_network
+ params['stack_name'] = self.stack_name
+ params['steady_state_samples'] = self.steady_state_samples
+ params['subnet_CIDR'] = self.subnet_CIDR
+ params['target'] = self.filename
params['volume_count'] = self.volume_count
params['volume_size'] = self.volume_size
- params['agent_info'] = json.dumps(self.slave_info)
- if self.volume_type is not None:
- params['volume_type'] = self.volume_type
- if self.username and self.password:
+ params['volume_type'] = self.volume_type
+ if self.username:
params['username'] = self.username
+ if self.password:
params['password'] = self.password
+ if self.ssh_key:
+ params['ssh_key'] = self.ssh_key
job_id = self._test_executor.execute(params)
self.slave_info = {}
@@ -503,7 +579,8 @@ class StorPerfMaster(object):
timer = 10
while not alive:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- result = s.connect_ex((slave, 22))
+ host, port = ip_helper.parse_address_and_port(slave)
+ result = s.connect_ex((host, port))
s.close()
if result:
@@ -520,13 +597,26 @@ class StorPerfMaster(object):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.username and self.password:
- ssh.connect(slave,
- username=self.username,
- password=self.password)
+ ssh.connect(
+ host,
+ port=port,
+ username=self.username,
+ password=self.password,
+ timeout=2)
+ elif self.username and self.ssh_key:
+ ssh.connect(
+ host,
+ port=port,
+ username=self.username,
+ pkey=self.ssh_key,
+ timeout=2)
else:
- ssh.connect(slave, username='storperf',
- key_filename='storperf/resources/ssh/storperf_rsa',
- timeout=2)
+ ssh.connect(
+ slave,
+ port=port,
+ username='storperf',
+ key_filename='storperf/resources/ssh/storperf_rsa',
+ timeout=2)
uname = self._get_uname(ssh)
logger.debug("Slave uname is %s" % uname)
@@ -550,6 +640,12 @@ class StorPerfMaster(object):
logger.debug("Transferring fio to %s" % slave)
scp.put('/usr/local/bin/fio', '~/')
+ if self.mkfs is not None:
+ self._mkfs(ssh, logger)
+
+ if self.mount_device is not None:
+ self._mount(ssh, logger)
+
def _get_uname(self, ssh):
(_, stdout, _) = ssh.exec_command("uname -a")
return stdout.readline()
@@ -562,6 +658,59 @@ class StorPerfMaster(object):
available = lines[3]
return int(available)
+ def _mkfs(self, ssh, logger):
+ command = "sudo umount %s" % (self.mount_device)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+
+ command = "sudo mkfs.%s %s" % (self.mkfs, self.mount_device)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ rc = stdout.channel.recv_exit_status()
+ stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ error_messages = ""
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+ error_messages += line.rstrip()
+
+ if rc != 0:
+ raise Exception(
+ "Error executing on {0}: {1}".format(
+ command, error_messages))
+
+ def _mount(self, ssh, logger):
+ command = "sudo mkdir -p %s" % (self.filename)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+
+ command = "sudo mount %s %s" % (self.mount_device, self.filename)
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ rc = stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ error_messages = ""
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+ error_messages += line.rstrip()
+
+ if rc != 0:
+ raise Exception(
+ "Could not mount {0}: {1}".format(
+ self.mount_device, error_messages))
+
def _resize_root_fs(self, ssh, logger):
command = "sudo /usr/sbin/resize2fs /dev/vda1"
logger.info("Attempting %s" % command)
@@ -573,11 +722,14 @@ class StorPerfMaster(object):
logger.error(line)
def _make_parameters(self):
+ random_str = uuid.uuid4().hex[:6].upper()
heat_parameters = {}
heat_parameters['public_network'] = self.public_network
heat_parameters['agent_count'] = self.agent_count
heat_parameters['volume_count'] = self.volume_count
heat_parameters['volume_size'] = self.volume_size
+ heat_parameters['keypair_name'] = 'storperf_agent_keypair' + random_str
+ heat_parameters['subnet_CIDR'] = self.subnet_CIDR
if self.volume_type is not None:
heat_parameters['volume_type'] = self.volume_type
heat_parameters['agent_image'] = self.agent_image
diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py
index 0ab5698..cb7e478 100644
--- a/docker/storperf-master/storperf/test_executor.py
+++ b/docker/storperf-master/storperf/test_executor.py
@@ -217,18 +217,19 @@ class TestExecutor(object):
def execute(self, metadata):
self.job_db.create_job_id()
+ self._setup_metadata(metadata)
try:
self.test_params()
except Exception as e:
self.terminate()
raise e
- self.job_db.record_workload_params(metadata)
- self._setup_metadata(metadata)
+ stripped_metadata = metadata.copy()
+ stripped_metadata.pop('ssh_key', None)
+ self.job_db.record_workload_params(stripped_metadata)
self._workload_thread = Thread(target=self.execute_workloads,
args=(),
name="Workload thread")
self._workload_thread.start()
- # seems to be hanging here
return self.job_db.job_id
def terminate(self):
@@ -263,10 +264,12 @@ class TestExecutor(object):
def _execute_workload(self, current_workload, workload, parse_only=False):
workload.options['iodepth'] = str(current_workload['queue-depth'])
workload.options['bs'] = str(current_workload['blocksize'])
+ self._workload_executors = []
slave_threads = []
thread_pool = ThreadPool(processes=len(self.slaves) *
self.volume_count)
+ self._workload_executors = []
for slave in self.slaves:
volume_number = 0
while volume_number < self.volume_count:
@@ -313,8 +316,9 @@ class TestExecutor(object):
continue
workload = current_workload['workload']
- self._thread_gate = ThreadGate(len(self.slaves),
- workload.options['status-interval'])
+ self._thread_gate = ThreadGate(
+ len(self.slaves) * min(1, self.volume_count),
+ float(workload.options['status-interval']))
self.current_workload = current_workload['name']
@@ -358,20 +362,25 @@ class TestExecutor(object):
workloads = []
if self._custom_workloads:
- for workload_name in self._custom_workloads.iterkeys():
- if not workload_name.isalnum():
+ for workload_name in self._custom_workloads.keys():
+ real_name = workload_name
+ if real_name.startswith('_'):
+ real_name = real_name.replace('_', '')
+ self.logger.info("--- real_name: %s" % real_name)
+
+ if not real_name.isalnum():
raise InvalidWorkloadName(
"Workload name must be alphanumeric only: %s" %
- workload_name)
+ real_name)
workload = _custom_workload()
- workload.options['name'] = workload_name
+ workload.options['name'] = real_name
workload.name = workload_name
if (self.filename is not None):
workload.filename = self.filename
workload.id = self.job_db.job_id
workload_params = self._custom_workloads[workload_name]
- for param, value in workload_params.iteritems():
+ for param, value in workload_params.items():
if param == "readwrite":
param = "rw"
if param in workload.fixed_options:
diff --git a/docker/storperf-master/storperf/utilities/data_handler.py b/docker/storperf-master/storperf/utilities/data_handler.py
index c7d70a7..98ae640 100644
--- a/docker/storperf-master/storperf/utilities/data_handler.py
+++ b/docker/storperf-master/storperf/utilities/data_handler.py
@@ -59,6 +59,12 @@ class DataHandler(object):
metrics[metric][io_type]['series'] = series
metrics[metric][io_type]['steady_state'] = steady
+ metrics[metric][io_type]['series_slope'] = \
+ math.slope_series(series)
+ metrics[metric][io_type]['series_min'] = \
+ math.min_series(series)
+ metrics[metric][io_type]['series_max'] = \
+ math.max_series(series)
treated_data = DataTreatment.data_treatment(series)
metrics[metric][io_type]['slope'] = \
@@ -151,9 +157,11 @@ class DataHandler(object):
test_db = os.environ.get('TEST_DB_URL')
if test_db is not None:
self.logger.info("Pushing results to %s" % (test_db))
+ stripped_metadata = executor.metadata
+ stripped_metadata.pop("ssh_key", None)
response = test_results_db.push_results_to_db(
test_db,
- executor.metadata,
+ stripped_metadata,
self.logger)
if response:
self.logger.info("Results reference: %s" % response['href'])
diff --git a/docker/storperf-master/storperf/utilities/ip_helper.py b/docker/storperf-master/storperf/utilities/ip_helper.py
new file mode 100644
index 0000000..06087b0
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/ip_helper.py
@@ -0,0 +1,27 @@
+##############################################################################
+# Copyright (c) 2019 VMware and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+
+def parse_address_and_port(address):
+ port = 22
+ if '.' in address:
+ # this is IPv4
+ if ':' in address:
+ host = address.split(':')[0]
+ port = int(address.split(':')[1])
+ else:
+ host = address
+ else:
+ if ']' in address:
+ # this is IPv6
+ host = address.split(']')[0].split('[')[1]
+ port = int(address.split(']')[1].split(':')[1])
+ else:
+ host = address
+ return (host, port)
diff --git a/docker/storperf-master/storperf/utilities/math.py b/docker/storperf-master/storperf/utilities/math.py
index 8e04134..173c39e 100644
--- a/docker/storperf-master/storperf/utilities/math.py
+++ b/docker/storperf-master/storperf/utilities/math.py
@@ -8,6 +8,9 @@
##############################################################################
import copy
+RANGE_DEVIATION = 0.20
+SLOPE_DEVIATION = 0.10
+
def slope(data_series):
"""
@@ -114,3 +117,67 @@ def average(data_series):
average = data_sum / float(m)
return average
+
+
+def slope_series(data_series):
+ """
+ This function returns an adjusted series based on the average
+ for the supplied series and the slope of the series.
+ """
+
+ new_series = []
+ average_series = []
+ for l in data_series:
+ average_series.append(l[1])
+
+ avg = average(average_series)
+ slp = slope(data_series)
+
+ if slp is None or avg is None:
+ return new_series
+
+ multiplier = float(len(data_series) + 1) / 2.0 - len(data_series)
+ for index, _ in data_series:
+ new_value = avg + (slp * multiplier)
+ new_series.append([index, new_value])
+ multiplier += 1
+
+ return new_series
+
+
+def min_series(data_series):
+ """
+ This function returns an copy of the series with only the
+ minimum allowed deviation as its values
+ """
+
+ new_series = []
+ average_series = []
+ for l in data_series:
+ average_series.append(l[1])
+ avg = average(average_series)
+ low = avg - (avg * RANGE_DEVIATION)
+
+ for index, _ in data_series:
+ new_series.append([index, low])
+
+ return new_series
+
+
+def max_series(data_series):
+ """
+ This function returns an copy of the series with only the
+ maximum allowed deviation as its values
+ """
+
+ new_series = []
+ average_series = []
+ for l in data_series:
+ average_series.append(l[1])
+ avg = average(average_series)
+ high = avg + (avg * RANGE_DEVIATION)
+
+ for index, _ in data_series:
+ new_series.append([index, high])
+
+ return new_series
diff --git a/docker/storperf-master/storperf/utilities/steady_state.py b/docker/storperf-master/storperf/utilities/steady_state.py
index 13f609d..23a74d7 100644
--- a/docker/storperf-master/storperf/utilities/steady_state.py
+++ b/docker/storperf-master/storperf/utilities/steady_state.py
@@ -9,7 +9,8 @@
import logging
from storperf.utilities import data_treatment as DataTreatment
-from storperf.utilities import math as math
+from storperf.utilities import math
+from storperf.utilities.math import RANGE_DEVIATION, SLOPE_DEVIATION
def steady_state(data_series):
@@ -38,15 +39,19 @@ def steady_state(data_series):
average_value is not None):
# Verification of the Steady State conditions following the SNIA
# definition
- range_condition = abs(range_value) <= 0.20 * abs(average_value)
- slope_condition = abs(slope_value) <= 0.10 * abs(average_value)
+ range_condition = abs(range_value) <= RANGE_DEVIATION * \
+ abs(average_value)
+ slope_condition = abs(slope_value) <= SLOPE_DEVIATION * \
+ abs(average_value)
steady_state = range_condition and slope_condition
logger.debug("Range %s <= %s?" % (abs(range_value),
- (0.20 * abs(average_value))))
+ (RANGE_DEVIATION *
+ abs(average_value))))
logger.debug("Slope %s <= %s?" % (abs(slope_value),
- (0.10 * abs(average_value))))
+ (SLOPE_DEVIATION *
+ abs(average_value))))
logger.debug("Steady State? %s" % steady_state)
else:
steady_state = False
diff --git a/docker/storperf-master/storperf/workloads/_base_workload.py b/docker/storperf-master/storperf/workloads/_base_workload.py
index 9b04314..5aa596e 100644
--- a/docker/storperf-master/storperf/workloads/_base_workload.py
+++ b/docker/storperf-master/storperf/workloads/_base_workload.py
@@ -44,17 +44,24 @@ class _base_workload(object):
self.options['size'] = "100%"
self.logger.debug(
"Profiling a device, using 100% of " + self.filename)
+ self.options['filename'] = self.filename
else:
- self.options['size'] = self.default_filesize
+ if 'size' not in self.options:
+ self.options['size'] = self.default_filesize
self.logger.debug("Profiling a filesystem, using " +
- self.default_filesize + " file")
-
- self.options['filename'] = self.filename
+ self.options['size'] + " file")
+ if not self.filename.endswith('/'):
+ self.filename = self.filename + "/"
+ self.options['directory'] = self.filename
+ self.options['filename_format'] = "'storperf.$jobnum.$filenum'"
self.setup()
- for key, value in self.options.iteritems():
- args.append('--' + key + "=" + value)
+ for key, value in self.options.items():
+ if value is not None:
+ args.append('--' + key + "=" + str(value))
+ else:
+ args.append('--' + key)
if parse_only:
args.append('--parse-only')
diff --git a/docker/storperf-master/storperf/workloads/_custom_workload.py b/docker/storperf-master/storperf/workloads/_custom_workload.py
index 9e0100d..5cd37b3 100644
--- a/docker/storperf-master/storperf/workloads/_custom_workload.py
+++ b/docker/storperf-master/storperf/workloads/_custom_workload.py
@@ -18,12 +18,12 @@ class _custom_workload(_base_workload._base_workload):
self.default_filesize = "1G"
self.filename = '/dev/vdb'
self.fixed_options = {
- 'loops': '200',
'output-format': 'json',
'status-interval': '60'
}
self.options = {
'ioengine': 'libaio',
+ 'loops': '200',
'direct': '1',
'numjobs': '1',
'rw': 'read',