summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xci/verify.sh3
-rw-r--r--cli.py6
-rw-r--r--rest_server.py4
-rw-r--r--storperf/carbon/emitter.py8
-rw-r--r--storperf/db/job_db.py10
-rw-r--r--storperf/fio/fio_invoker.py11
-rw-r--r--storperf/storperf_master.py3
-rw-r--r--storperf/test_executor.py84
-rw-r--r--storperf/tests/db_tests/job_db_test.py39
-rw-r--r--storperf/tests/workload_tests/workload_subclass_test.py54
-rw-r--r--storperf/workloads/_base_workload.py27
11 files changed, 154 insertions, 95 deletions
diff --git a/ci/verify.sh b/ci/verify.sh
index 2f67e94..70ecb6a 100755
--- a/ci/verify.sh
+++ b/ci/verify.sh
@@ -19,6 +19,8 @@ virtualenv $WORKSPACE/storperf_venv
source $WORKSPACE/storperf_venv/bin/activate
pip install setuptools
+pip install autoflake=00.6.6
+pip install autopep8==1.2.2
pip install coverage==4.0.3
pip install flask==0.10
pip install flask-restful==0.3.5
@@ -27,6 +29,7 @@ pip install flake8==2.5.4
pip install html2text==2016.1.8
pip install mock==1.3.0
pip install nose==1.3.7
+pip install pysqlite==2.8.2
pip install python-cinderclient==1.6.0
pip install python-glanceclient==1.1.0
pip install python-heatclient==0.8.0
diff --git a/cli.py b/cli.py
index 560d77d..5595314 100644
--- a/cli.py
+++ b/cli.py
@@ -10,7 +10,6 @@
"""
from storperf.storperf_master import StorPerfMaster
-from storperf.test_executor import UnknownWorkload
from threading import Thread
import cPickle
import getopt
@@ -18,13 +17,10 @@ import json
import logging
import logging.config
import logging.handlers
-import os
import socket
import struct
import sys
-import time
-import html2text
import requests
@@ -49,7 +45,7 @@ class LogRecordStreamHandler(object):
while True:
datagram = self.socket.recv(8192)
chunk = datagram[0:4]
- slen = struct.unpack(">L", chunk)[0]
+ struct.unpack(">L", chunk)[0]
chunk = datagram[4:]
obj = cPickle.loads(chunk)
record = logging.makeLogRecord(obj)
diff --git a/rest_server.py b/rest_server.py
index 073004a..ffb750e 100644
--- a/rest_server.py
+++ b/rest_server.py
@@ -62,7 +62,6 @@ class Configure(Resource):
storperf.delete_stack()
except Exception as e:
abort(400, str(e))
- pass
class StartJob(Resource):
@@ -87,6 +86,9 @@ class StartJob(Resource):
storperf.workloads = request.json['workload']
else:
storperf.workloads = None
+ # Add block size, queue depth, number of passes here.
+ if ('workload' in request.json):
+ storperf.workloads = request.json['workload']
job_id = storperf.execute_workloads()
diff --git a/storperf/carbon/emitter.py b/storperf/carbon/emitter.py
index 8a9f734..6104fd4 100644
--- a/storperf/carbon/emitter.py
+++ b/storperf/carbon/emitter.py
@@ -26,12 +26,12 @@ class CarbonMetricTransmitter():
else:
timestamp = str(calendar.timegm(time.gmtime()))
- self.carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.carbon_socket.connect((self.carbon_host, self.carbon_port))
+ carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ carbon_socket.connect((self.carbon_host, self.carbon_port))
for key, metric in metrics.items():
message = key + " " + metric + " " + timestamp
self.logger.debug("Metric: " + message)
- self.carbon_socket.send(message + '\n')
+ carbon_socket.send(message + '\n')
- self.carbon_socket.close()
+ carbon_socket.close()
diff --git a/storperf/db/job_db.py b/storperf/db/job_db.py
index 57c82cb..8aa4c11 100644
--- a/storperf/db/job_db.py
+++ b/storperf/db/job_db.py
@@ -76,11 +76,13 @@ class JobDB(object):
db.commit()
db.close()
- def start_workload(self, workload_name):
+ def start_workload(self, workload):
"""
Records the start time for the given workload
"""
+ workload_name = workload.fullname
+
if (self.job_id is None):
self.create_job_id()
@@ -122,13 +124,15 @@ class JobDB(object):
db.commit()
db.close()
- def end_workload(self, workload_name):
+ def end_workload(self, workload):
"""
Records the end time for the given workload
"""
if (self.job_id is None):
self.create_job_id()
+ workload_name = workload.fullname
+
with db_mutex:
db = sqlite3.connect(JobDB.db_name)
@@ -174,8 +178,6 @@ class JobDB(object):
workload_prefix = workload_prefix + "%"
- stats = ()
-
start_time = str(calendar.timegm(time.gmtime()))
end_time = "0"
diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py
index 5e30a76..fad2546 100644
--- a/storperf/fio/fio_invoker.py
+++ b/storperf/fio/fio_invoker.py
@@ -8,7 +8,6 @@
##############################################################################
from threading import Thread
-import cmd
import json
import logging
import subprocess
@@ -48,17 +47,17 @@ class FIOInvoker(object):
self.json_body += line
try:
if line == "}\n":
- self.logger.debug(
- "Have a json snippet: %s", self.json_body)
json_metric = json.loads(self.json_body)
self.json_body = ""
for event_listener in self.event_listeners:
- event_listener(self.callback_id, json_metric)
-
+ try:
+ event_listener(self.callback_id, json_metric)
+ except Exception, e:
+ self.logger.error("Notifying listener %s: %s",
+ self.callback_id, e)
except Exception, e:
self.logger.error("Error parsing JSON: %s", e)
- pass
except ValueError:
pass # We might have read from the closed socket, ignore it
diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py
index a467aef..b4fef7f 100644
--- a/storperf/storperf_master.py
+++ b/storperf/storperf_master.py
@@ -202,7 +202,6 @@ class StorPerfMaster(object):
parameters=self._make_parameters())
self.stack_id = stack['stack']['id']
- pass
def validate_stack(self):
self._attach_to_openstack()
@@ -232,8 +231,6 @@ class StorPerfMaster(object):
self._heat_client.stacks.delete(stack_id=self.stack_id)
self.stack_id = None
- pass
-
def execute_workloads(self):
if (self.stack_id is None):
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 734b514..497d17c 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -14,6 +14,7 @@ from storperf.carbon.emitter import CarbonMetricTransmitter
from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
from threading import Thread
+import copy
import imp
import logging
import os
@@ -37,6 +38,7 @@ class TestExecutor(object):
self.prefix = None
self.job_db = JobDB()
self._slaves = []
+ self._workload_thread = None
@property
def slaves(self):
@@ -58,22 +60,9 @@ class TestExecutor(object):
metric,
callback_id)
- read_latency = carbon_metrics[callback_id + ".jobs.1.read.lat.mean"]
- write_latency = carbon_metrics[callback_id + ".jobs.1.write.lat.mean"]
- read_iops = carbon_metrics[callback_id + ".jobs.1.read.iops"]
- write_iops = carbon_metrics[callback_id + ".jobs.1.write.iops"]
-
- message = "Average Latency us Read/Write: " + read_latency \
- + "/" + write_latency + " IOPS r/w: " + \
- read_iops + "/" + write_iops
-
- for event_listener in self.event_listeners:
- event_listener(message)
-
self.metrics_emitter.transmit_metrics(carbon_metrics)
def register_workloads(self, workloads):
-
if (workloads is None or len(workloads) == 0):
workload_dir = os.path.normpath(
os.path.join(os.path.dirname(__file__), "workloads"))
@@ -124,66 +113,59 @@ class TestExecutor(object):
return None
def execute(self):
-
self.job_db.create_job_id()
- for slave in self.slaves:
- t = Thread(target=self.execute_on_node, args=(slave,))
- t.daemon = False
- t.start()
-
+ self._workload_thread = Thread(target=self.execute_workloads, args=())
+ self._workload_thread.start()
return self.job_db.job_id
- def execute_on_node(self, remote_host):
-
- logger = logging.getLogger(__name__ + ":" + remote_host)
-
- invoker = FIOInvoker()
- invoker.remote_host = remote_host
- invoker.register(self.event)
-
- logger.info(
- "Starting job " + self.job_db.job_id + " on " + remote_host)
-
+ def execute_workloads(self):
for workload_module in self.workload_modules:
-
workload_name = getattr(workload_module, "__name__")
constructorMethod = getattr(workload_module, workload_name)
- logger.debug(
- "Found workload: " + str(constructorMethod))
workload = constructorMethod()
if (self.filename is not None):
workload.filename = self.filename
- workload.invoker = invoker
if (workload_name.startswith("_")):
iodepths = [2, ]
- blocksizes = [65536, ]
+ blocksizes = [8192, ]
else:
iodepths = [1, 16, 128]
- blocksizes = [4096, 65536, 1048576]
+ blocksizes = [8192, 4096, 512]
+
+ workload.id = self.job_db.job_id
for blocksize in blocksizes:
for iodepth in iodepths:
-
- full_workload_name = workload_name + \
- ".host." + remote_host + \
- ".queue-depth." + str(iodepth) + \
- ".block-size." + str(blocksize)
-
workload.options['iodepth'] = str(iodepth)
workload.options['bs'] = str(blocksize)
- self.logger.info(
- "Executing workload: " + full_workload_name)
- invoker.callback_id = self.job_db.job_id + \
- "." + full_workload_name
+ slave_threads = []
+ for slave in self.slaves:
+ slave_workload = copy.copy(workload)
+ slave_workload.remote_host = slave
+ t = Thread(target=self.execute_on_node,
+ args=(slave_workload,))
+ t.daemon = False
+ t.start()
+ slave_threads.append(t)
+
+ for slave_thread in slave_threads:
+ slave_thread.join()
+
+ def execute_on_node(self, workload):
+
+ invoker = FIOInvoker()
+ invoker.register(self.event)
+ workload.invoker = invoker
+
+ self.logger.info("Starting " + workload.fullname)
- self.job_db.start_workload(full_workload_name)
- workload.execute()
- self.job_db.end_workload(full_workload_name)
+ self.job_db.start_workload(workload)
+ workload.execute()
+ self.job_db.end_workload(workload)
- logger.info(
- "Finished job " + self.job_db.job_id + " on " + remote_host)
+ self.logger.info("Ended " + workload.fullname)
def fetch_results(self, job, workload_name=""):
self.job_db.job_id = job
diff --git a/storperf/tests/db_tests/job_db_test.py b/storperf/tests/db_tests/job_db_test.py
index 4620412..92b1482 100644
--- a/storperf/tests/db_tests/job_db_test.py
+++ b/storperf/tests/db_tests/job_db_test.py
@@ -8,6 +8,7 @@
##############################################################################
from storperf.db.job_db import JobDB
+from storperf.workloads.rr import rr
import os
import sqlite3
import unittest
@@ -60,7 +61,7 @@ class JobDBTest(unittest.TestCase):
start_time = "12345"
mock_calendar.side_effect = (start_time,)
mock_uuid.side_effect = (job_id,)
- workload_name = "Workload"
+ workload = rr()
db = sqlite3.connect(JobDB.db_name)
cursor = db.cursor()
@@ -69,26 +70,26 @@ class JobDBTest(unittest.TestCase):
"""select * from jobs
where job_id = ?
and workload = ?""",
- (job_id, workload_name,))
+ (job_id, workload.fullname,))
self.assertEqual(None,
row.fetchone(),
"Should not have been a row in the db")
- self.job.start_workload(workload_name)
+ self.job.start_workload(workload)
cursor.execute(
"""select job_id, workload, start from jobs
where job_id = ?
and workload = ?""",
- (job_id, workload_name,))
+ (job_id, workload.fullname,))
row = cursor.fetchone()
self.assertNotEqual(None, row, "Should be a row in the db")
self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
self.assertEqual(
- workload_name, row[1], "Did not expect " + str(row[1]))
+ workload.fullname, row[1], "Did not expect " + str(row[1]))
self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
@mock.patch("uuid.uuid4")
@@ -99,10 +100,10 @@ class JobDBTest(unittest.TestCase):
end_time = "54321"
mock_calendar.side_effect = (start_time, end_time,)
mock_uuid.side_effect = (job_id,)
- workload_name = "Workload"
+ workload = rr()
- self.job.start_workload(workload_name)
- self.job.end_workload(workload_name)
+ self.job.start_workload(workload)
+ self.job.end_workload(workload)
db = sqlite3.connect(JobDB.db_name)
cursor = db.cursor()
@@ -110,14 +111,14 @@ class JobDBTest(unittest.TestCase):
"""select job_id, workload, start, end from jobs
where job_id = ?
and workload = ?""",
- (job_id, workload_name,))
+ (job_id, workload.fullname,))
row = cursor.fetchone()
self.assertNotEqual(None, row, "Should be a row in the db")
self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
self.assertEqual(
- workload_name, row[1], "Did not expect " + str(row[1]))
+ workload.fullname, row[1], "Did not expect " + str(row[1]))
self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
self.assertEqual(end_time, row[3], "Did not expect " + str(row[3]))
@@ -130,26 +131,26 @@ class JobDBTest(unittest.TestCase):
mock_calendar.side_effect = (start_time_1, start_time_2)
mock_uuid.side_effect = (job_id,)
- workload_name = "Workload"
+ workload = rr()
db = sqlite3.connect(JobDB.db_name)
cursor = db.cursor()
- self.job.start_workload(workload_name)
- self.job.start_workload(workload_name)
+ self.job.start_workload(workload)
+ self.job.start_workload(workload)
cursor.execute(
"""select job_id, workload, start from jobs
where job_id = ?
and workload = ?""",
- (job_id, workload_name,))
+ (job_id, workload.fullname,))
row = cursor.fetchone()
self.assertNotEqual(None, row, "Should be a row in the db")
self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
self.assertEqual(
- workload_name, row[1], "Did not expect " + str(row[1]))
+ workload.fullname, row[1], "Did not expect " + str(row[1]))
self.assertEqual(start_time_2, row[2], "Did not expect " + str(row[2]))
@mock.patch("uuid.uuid4")
@@ -160,9 +161,9 @@ class JobDBTest(unittest.TestCase):
end_time = "54321"
mock_calendar.side_effect = (start_time, end_time,)
mock_uuid.side_effect = (job_id,)
- workload_name = "Workload"
+ workload = rr()
- self.job.end_workload(workload_name)
+ self.job.end_workload(workload)
db = sqlite3.connect(JobDB.db_name)
cursor = db.cursor()
@@ -170,14 +171,14 @@ class JobDBTest(unittest.TestCase):
"""select job_id, workload, start, end from jobs
where job_id = ?
and workload = ?""",
- (job_id, workload_name,))
+ (job_id, workload.fullname,))
row = cursor.fetchone()
self.assertNotEqual(None, row, "Should be a row in the db")
self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
self.assertEqual(
- workload_name, row[1], "Did not expect " + str(row[1]))
+ workload.fullname, row[1], "Did not expect " + str(row[1]))
# The start time is set to the same time as end if it was never set
# before
self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
diff --git a/storperf/tests/workload_tests/workload_subclass_test.py b/storperf/tests/workload_tests/workload_subclass_test.py
new file mode 100644
index 0000000..97b6b46
--- /dev/null
+++ b/storperf/tests/workload_tests/workload_subclass_test.py
@@ -0,0 +1,54 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import unittest
+from storperf.workloads.rr import rr
+from storperf.workloads.rs import rs
+from storperf.workloads.rw import rw
+from storperf.workloads.wr import wr
+from storperf.workloads.ws import ws
+
+
+class WorkloadSubclassTest(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def test_local_name(self):
+ workload = rr()
+ self.assertEqual(workload.fullname,
+ "None.rr.None.queue-depth.1.block-size.64k",
+ workload.fullname)
+
+ def test_remote_name(self):
+ workload = rw()
+ workload.remote_host = "192.168.0.1"
+ self.assertEqual(workload.fullname,
+ "None.rw.192-168-0-1.queue-depth.1.block-size.64k",
+ workload.fullname)
+
+ def test_blocksize(self):
+ workload = rs()
+ workload.options["bs"] = "4k"
+ self.assertEqual(workload.fullname,
+ "None.rs.None.queue-depth.1.block-size.4k",
+ workload.fullname)
+
+ def test_queue_depth(self):
+ workload = wr()
+ workload.options["iodepth"] = "8"
+ self.assertEqual(workload.fullname,
+ "None.wr.None.queue-depth.8.block-size.64k",
+ workload.fullname)
+
+ def test_id(self):
+ workload = ws()
+ workload.id = "workloadid"
+ self.assertEqual(workload.fullname,
+ "workloadid.ws.None.queue-depth.1.block-size.64k",
+ workload.fullname)
diff --git a/storperf/workloads/_base_workload.py b/storperf/workloads/_base_workload.py
index f7c14ad..4eccc08 100644
--- a/storperf/workloads/_base_workload.py
+++ b/storperf/workloads/_base_workload.py
@@ -13,7 +13,7 @@ import logging
class _base_workload(object):
def __init__(self):
- self.logger = logging.getLogger(__name__)
+ self.logger = logging.getLogger(self.__class__.__name__)
self.default_filesize = "100%"
self.filename = '/dev/vdb'
self.options = {
@@ -25,12 +25,19 @@ class _base_workload(object):
'numjobs': '1',
'loops': '2',
'output-format': 'json',
- 'status-interval': '600'
+ 'status-interval': '60'
}
self.invoker = None
+ self.remote_host = None
+ self.id = None
def execute(self):
+ if self.invoker is None:
+ raise ValueError("No invoker has been set")
+
args = []
+ self.invoker.remote_host = self.remote_host
+ self.invoker.callback_id = self.fullname
if self.filename.startswith("/dev"):
self.options['size'] = "100%"
@@ -52,3 +59,19 @@ class _base_workload(object):
def setup(self):
pass
+
+ @property
+ def remote_host(self):
+ return str(self._remote_host)
+
+ @remote_host.setter
+ def remote_host(self, value):
+ self._remote_host = value
+
+ @property
+ def fullname(self):
+ return str(self.id) + "." + \
+ self.__class__.__name__ + "." + \
+ str(self.remote_host).replace(".", "-") + \
+ ".queue-depth." + str(self.options['iodepth']) + \
+ ".block-size." + str(self.options['bs'])