summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMark Beierl <mark.beierl@emc.com>2016-04-25 09:55:03 -0400
committerMark Beierl <mark.beierl@emc.com>2016-04-25 10:51:31 -0400
commit311eee3bec00d5acc32b6eba76a7ff0d1990f4b2 (patch)
tree9fe4ee0f33687ebaed1c82ff3bea0d569039cc78
parent07a375f25ee831100ecf21e7bb9c1bdcd3b960f5 (diff)
Job run lifecycle rework
Change the way slave jobs are managed so that they are in step with each other, and we can track the overall thread that is running them. This lays groundwork for STORPERF-20 and STORPERF-44 JIRA: STORPERF-33 STORPERF-43 Change-Id: Iaff48a2823ba85d6512e9782fd9091a72639835c Signed-off-by: Mark Beierl <mark.beierl@emc.com>
-rwxr-xr-xci/verify.sh3
-rw-r--r--cli.py6
-rw-r--r--rest_server.py4
-rw-r--r--storperf/carbon/emitter.py8
-rw-r--r--storperf/db/job_db.py10
-rw-r--r--storperf/fio/fio_invoker.py11
-rw-r--r--storperf/storperf_master.py3
-rw-r--r--storperf/test_executor.py84
-rw-r--r--storperf/tests/db_tests/job_db_test.py39
-rw-r--r--storperf/tests/workload_tests/workload_subclass_test.py54
-rw-r--r--storperf/workloads/_base_workload.py27
11 files changed, 154 insertions, 95 deletions
diff --git a/ci/verify.sh b/ci/verify.sh
index 2f67e94..70ecb6a 100755
--- a/ci/verify.sh
+++ b/ci/verify.sh
@@ -19,6 +19,8 @@ virtualenv $WORKSPACE/storperf_venv
source $WORKSPACE/storperf_venv/bin/activate
pip install setuptools
+pip install autoflake=00.6.6
+pip install autopep8==1.2.2
pip install coverage==4.0.3
pip install flask==0.10
pip install flask-restful==0.3.5
@@ -27,6 +29,7 @@ pip install flake8==2.5.4
pip install html2text==2016.1.8
pip install mock==1.3.0
pip install nose==1.3.7
+pip install pysqlite==2.8.2
pip install python-cinderclient==1.6.0
pip install python-glanceclient==1.1.0
pip install python-heatclient==0.8.0
diff --git a/cli.py b/cli.py
index 560d77d..5595314 100644
--- a/cli.py
+++ b/cli.py
@@ -10,7 +10,6 @@
"""
from storperf.storperf_master import StorPerfMaster
-from storperf.test_executor import UnknownWorkload
from threading import Thread
import cPickle
import getopt
@@ -18,13 +17,10 @@ import json
import logging
import logging.config
import logging.handlers
-import os
import socket
import struct
import sys
-import time
-import html2text
import requests
@@ -49,7 +45,7 @@ class LogRecordStreamHandler(object):
while True:
datagram = self.socket.recv(8192)
chunk = datagram[0:4]
- slen = struct.unpack(">L", chunk)[0]
+ struct.unpack(">L", chunk)[0]
chunk = datagram[4:]
obj = cPickle.loads(chunk)
record = logging.makeLogRecord(obj)
diff --git a/rest_server.py b/rest_server.py
index 073004a..ffb750e 100644
--- a/rest_server.py
+++ b/rest_server.py
@@ -62,7 +62,6 @@ class Configure(Resource):
storperf.delete_stack()
except Exception as e:
abort(400, str(e))
- pass
class StartJob(Resource):
@@ -87,6 +86,9 @@ class StartJob(Resource):
storperf.workloads = request.json['workload']
else:
storperf.workloads = None
+ # Add block size, queue depth, number of passes here.
+ if ('workload' in request.json):
+ storperf.workloads = request.json['workload']
job_id = storperf.execute_workloads()
diff --git a/storperf/carbon/emitter.py b/storperf/carbon/emitter.py
index 8a9f734..6104fd4 100644
--- a/storperf/carbon/emitter.py
+++ b/storperf/carbon/emitter.py
@@ -26,12 +26,12 @@ class CarbonMetricTransmitter():
else:
timestamp = str(calendar.timegm(time.gmtime()))
- self.carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.carbon_socket.connect((self.carbon_host, self.carbon_port))
+ carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ carbon_socket.connect((self.carbon_host, self.carbon_port))
for key, metric in metrics.items():
message = key + " " + metric + " " + timestamp
self.logger.debug("Metric: " + message)
- self.carbon_socket.send(message + '\n')
+ carbon_socket.send(message + '\n')
- self.carbon_socket.close()
+ carbon_socket.close()
diff --git a/storperf/db/job_db.py b/storperf/db/job_db.py
index 57c82cb..8aa4c11 100644
--- a/storperf/db/job_db.py
+++ b/storperf/db/job_db.py
@@ -76,11 +76,13 @@ class JobDB(object):
db.commit()
db.close()
- def start_workload(self, workload_name):
+ def start_workload(self, workload):
"""
Records the start time for the given workload
"""
+ workload_name = workload.fullname
+
if (self.job_id is None):
self.create_job_id()
@@ -122,13 +124,15 @@ class JobDB(object):
db.commit()
db.close()
- def end_workload(self, workload_name):
+ def end_workload(self, workload):
"""
Records the end time for the given workload
"""
if (self.job_id is None):
self.create_job_id()
+ workload_name = workload.fullname
+
with db_mutex:
db = sqlite3.connect(JobDB.db_name)
@@ -174,8 +178,6 @@ class JobDB(object):
workload_prefix = workload_prefix + "%"
- stats = ()
-
start_time = str(calendar.timegm(time.gmtime()))
end_time = "0"
diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py
index 5e30a76..fad2546 100644
--- a/storperf/fio/fio_invoker.py
+++ b/storperf/fio/fio_invoker.py
@@ -8,7 +8,6 @@
##############################################################################
from threading import Thread
-import cmd
import json
import logging
import subprocess
@@ -48,17 +47,17 @@ class FIOInvoker(object):
self.json_body += line
try:
if line == "}\n":
- self.logger.debug(
- "Have a json snippet: %s", self.json_body)
json_metric = json.loads(self.json_body)
self.json_body = ""
for event_listener in self.event_listeners:
- event_listener(self.callback_id, json_metric)
-
+ try:
+ event_listener(self.callback_id, json_metric)
+ except Exception, e:
+ self.logger.error("Notifying listener %s: %s",
+ self.callback_id, e)
except Exception, e:
self.logger.error("Error parsing JSON: %s", e)
- pass
except ValueError:
pass # We might have read from the closed socket, ignore it
diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py
index a467aef..b4fef7f 100644
--- a/storperf/storperf_master.py
+++ b/storperf/storperf_master.py
@@ -202,7 +202,6 @@ class StorPerfMaster(object):
parameters=self._make_parameters())
self.stack_id = stack['stack']['id']
- pass
def validate_stack(self):
self._attach_to_openstack()
@@ -232,8 +231,6 @@ class StorPerfMaster(object):
self._heat_client.stacks.delete(stack_id=self.stack_id)
self.stack_id = None
- pass
-
def execute_workloads(self):
if (self.stack_id is None):
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 734b514..497d17c 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -14,6 +14,7 @@ from storperf.carbon.emitter import CarbonMetricTransmitter
from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
from threading import Thread
+import copy
import imp
import logging
import os
@@ -37,6 +38,7 @@ class TestExecutor(object):
self.prefix = None
self.job_db = JobDB()
self._slaves = []
+ self._workload_thread = None
@property
def slaves(self):
@@ -58,22 +60,9 @@ class TestExecutor(object):
metric,
callback_id)
- read_latency = carbon_metrics[callback_id + ".jobs.1.read.lat.mean"]
- write_latency = carbon_metrics[callback_id + ".jobs.1.write.lat.mean"]
- read_iops = carbon_metrics[callback_id + ".jobs.1.read.iops"]
- write_iops = carbon_metrics[callback_id + ".jobs.1.write.iops"]
-
- message = "Average Latency us Read/Write: " + read_latency \
- + "/" + write_latency + " IOPS r/w: " + \
- read_iops + "/" + write_iops
-
- for event_listener in self.event_listeners:
- event_listener(message)
-
self.metrics_emitter.transmit_metrics(carbon_metrics)
def register_workloads(self, workloads):
-
if (workloads is None or len(workloads) == 0):
workload_dir = os.path.normpath(
os.path.join(os.path.dirname(__file__), "workloads"))
@@ -124,66 +113,59 @@ class TestExecutor(object):
return None
def execute(self):
-
self.job_db.create_job_id()
- for slave in self.slaves:
- t = Thread(target=self.execute_on_node, args=(slave,))
- t.daemon = False
- t.start()
-
+ self._workload_thread = Thread(target=self.execute_workloads, args=())
+ self._workload_thread.start()
return self.job_db.job_id
- def execute_on_node(self, remote_host):
-
- logger = logging.getLogger(__name__ + ":" + remote_host)
-
- invoker = FIOInvoker()
- invoker.remote_host = remote_host
- invoker.register(self.event)
-
- logger.info(
- "Starting job " + self.job_db.job_id + " on " + remote_host)
-
+ def execute_workloads(self):
for workload_module in self.workload_modules:
-
workload_name = getattr(workload_module, "__name__")
constructorMethod = getattr(workload_module, workload_name)
- logger.debug(
- "Found workload: " + str(constructorMethod))
workload = constructorMethod()
if (self.filename is not None):
workload.filename = self.filename
- workload.invoker = invoker
if (workload_name.startswith("_")):
iodepths = [2, ]
- blocksizes = [65536, ]
+ blocksizes = [8192, ]
else:
iodepths = [1, 16, 128]
- blocksizes = [4096, 65536, 1048576]
+ blocksizes = [8192, 4096, 512]
+
+ workload.id = self.job_db.job_id
for blocksize in blocksizes:
for iodepth in iodepths:
-
- full_workload_name = workload_name + \
- ".host." + remote_host + \
- ".queue-depth." + str(iodepth) + \
- ".block-size." + str(blocksize)
-
workload.options['iodepth'] = str(iodepth)
workload.options['bs'] = str(blocksize)
- self.logger.info(
- "Executing workload: " + full_workload_name)
- invoker.callback_id = self.job_db.job_id + \
- "." + full_workload_name
+ slave_threads = []
+ for slave in self.slaves:
+ slave_workload = copy.copy(workload)
+ slave_workload.remote_host = slave
+ t = Thread(target=self.execute_on_node,
+ args=(slave_workload,))
+ t.daemon = False
+ t.start()
+ slave_threads.append(t)
+
+ for slave_thread in slave_threads:
+ slave_thread.join()
+
+ def execute_on_node(self, workload):
+
+ invoker = FIOInvoker()
+ invoker.register(self.event)
+ workload.invoker = invoker
+
+ self.logger.info("Starting " + workload.fullname)
- self.job_db.start_workload(full_workload_name)
- workload.execute()
- self.job_db.end_workload(full_workload_name)
+ self.job_db.start_workload(workload)
+ workload.execute()
+ self.job_db.end_workload(workload)
- logger.info(
- "Finished job " + self.job_db.job_id + " on " + remote_host)
+ self.logger.info("Ended " + workload.fullname)
def fetch_results(self, job, workload_name=""):
self.job_db.job_id = job
diff --git a/storperf/tests/db_tests/job_db_test.py b/storperf/tests/db_tests/job_db_test.py
index 4620412..92b1482 100644
--- a/storperf/tests/db_tests/job_db_test.py
+++ b/storperf/tests/db_tests/job_db_test.py
@@ -8,6 +8,7 @@
##############################################################################
from storperf.db.job_db import JobDB
+from storperf.workloads.rr import rr
import os
import sqlite3
import unittest
@@ -60,7 +61,7 @@ class JobDBTest(unittest.TestCase):
start_time = "12345"
mock_calendar.side_effect = (start_time,)
mock_uuid.side_effect = (job_id,)
- workload_name = "Workload"
+ workload = rr()
db = sqlite3.connect(JobDB.db_name)
cursor = db.cursor()
@@ -69,26 +70,26 @@ class JobDBTest(unittest.TestCase):
"""select * from jobs
where job_id = ?
and workload = ?""",
- (job_id, workload_name,))
+ (job_id, workload.fullname,))
self.assertEqual(None,
row.fetchone(),
"Should not have been a row in the db")
- self.job.start_workload(workload_name)
+ self.job.start_workload(workload)
cursor.execute(
"""select job_id, workload, start from jobs
where job_id = ?
and workload = ?""",
- (job_id, workload_name,))
+ (job_id, workload.fullname,))
row = cursor.fetchone()
self.assertNotEqual(None, row, "Should be a row in the db")
self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
self.assertEqual(
- workload_name, row[1], "Did not expect " + str(row[1]))
+ workload.fullname, row[1], "Did not expect " + str(row[1]))
self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
@mock.patch("uuid.uuid4")
@@ -99,10 +100,10 @@ class JobDBTest(unittest.TestCase):
end_time = "54321"
mock_calendar.side_effect = (start_time, end_time,)
mock_uuid.side_effect = (job_id,)
- workload_name = "Workload"
+ workload = rr()
- self.job.start_workload(workload_name)
- self.job.end_workload(workload_name)
+ self.job.start_workload(workload)
+ self.job.end_workload(workload)
db = sqlite3.connect(JobDB.db_name)
cursor = db.cursor()
@@ -110,14 +111,14 @@ class JobDBTest(unittest.TestCase):
"""select job_id, workload, start, end from jobs
where job_id = ?
and workload = ?""",
- (job_id, workload_name,))
+ (job_id, workload.fullname,))
row = cursor.fetchone()
self.assertNotEqual(None, row, "Should be a row in the db")
self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
self.assertEqual(
- workload_name, row[1], "Did not expect " + str(row[1]))
+ workload.fullname, row[1], "Did not expect " + str(row[1]))
self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
self.assertEqual(end_time, row[3], "Did not expect " + str(row[3]))
@@ -130,26 +131,26 @@ class JobDBTest(unittest.TestCase):
mock_calendar.side_effect = (start_time_1, start_time_2)
mock_uuid.side_effect = (job_id,)
- workload_name = "Workload"
+ workload = rr()
db = sqlite3.connect(JobDB.db_name)
cursor = db.cursor()
- self.job.start_workload(workload_name)
- self.job.start_workload(workload_name)
+ self.job.start_workload(workload)
+ self.job.start_workload(workload)
cursor.execute(
"""select job_id, workload, start from jobs
where job_id = ?
and workload = ?""",
- (job_id, workload_name,))
+ (job_id, workload.fullname,))
row = cursor.fetchone()
self.assertNotEqual(None, row, "Should be a row in the db")
self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
self.assertEqual(
- workload_name, row[1], "Did not expect " + str(row[1]))
+ workload.fullname, row[1], "Did not expect " + str(row[1]))
self.assertEqual(start_time_2, row[2], "Did not expect " + str(row[2]))
@mock.patch("uuid.uuid4")
@@ -160,9 +161,9 @@ class JobDBTest(unittest.TestCase):
end_time = "54321"
mock_calendar.side_effect = (start_time, end_time,)
mock_uuid.side_effect = (job_id,)
- workload_name = "Workload"
+ workload = rr()
- self.job.end_workload(workload_name)
+ self.job.end_workload(workload)
db = sqlite3.connect(JobDB.db_name)
cursor = db.cursor()
@@ -170,14 +171,14 @@ class JobDBTest(unittest.TestCase):
"""select job_id, workload, start, end from jobs
where job_id = ?
and workload = ?""",
- (job_id, workload_name,))
+ (job_id, workload.fullname,))
row = cursor.fetchone()
self.assertNotEqual(None, row, "Should be a row in the db")
self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
self.assertEqual(
- workload_name, row[1], "Did not expect " + str(row[1]))
+ workload.fullname, row[1], "Did not expect " + str(row[1]))
# The start time is set to the same time as end if it was never set
# before
self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
diff --git a/storperf/tests/workload_tests/workload_subclass_test.py b/storperf/tests/workload_tests/workload_subclass_test.py
new file mode 100644
index 0000000..97b6b46
--- /dev/null
+++ b/storperf/tests/workload_tests/workload_subclass_test.py
@@ -0,0 +1,54 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import unittest
+from storperf.workloads.rr import rr
+from storperf.workloads.rs import rs
+from storperf.workloads.rw import rw
+from storperf.workloads.wr import wr
+from storperf.workloads.ws import ws
+
+
+class WorkloadSubclassTest(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def test_local_name(self):
+ workload = rr()
+ self.assertEqual(workload.fullname,
+ "None.rr.None.queue-depth.1.block-size.64k",
+ workload.fullname)
+
+ def test_remote_name(self):
+ workload = rw()
+ workload.remote_host = "192.168.0.1"
+ self.assertEqual(workload.fullname,
+ "None.rw.192-168-0-1.queue-depth.1.block-size.64k",
+ workload.fullname)
+
+ def test_blocksize(self):
+ workload = rs()
+ workload.options["bs"] = "4k"
+ self.assertEqual(workload.fullname,
+ "None.rs.None.queue-depth.1.block-size.4k",
+ workload.fullname)
+
+ def test_queue_depth(self):
+ workload = wr()
+ workload.options["iodepth"] = "8"
+ self.assertEqual(workload.fullname,
+ "None.wr.None.queue-depth.8.block-size.64k",
+ workload.fullname)
+
+ def test_id(self):
+ workload = ws()
+ workload.id = "workloadid"
+ self.assertEqual(workload.fullname,
+ "workloadid.ws.None.queue-depth.1.block-size.64k",
+ workload.fullname)
diff --git a/storperf/workloads/_base_workload.py b/storperf/workloads/_base_workload.py
index f7c14ad..4eccc08 100644
--- a/storperf/workloads/_base_workload.py
+++ b/storperf/workloads/_base_workload.py
@@ -13,7 +13,7 @@ import logging
class _base_workload(object):
def __init__(self):
- self.logger = logging.getLogger(__name__)
+ self.logger = logging.getLogger(self.__class__.__name__)
self.default_filesize = "100%"
self.filename = '/dev/vdb'
self.options = {
@@ -25,12 +25,19 @@ class _base_workload(object):
'numjobs': '1',
'loops': '2',
'output-format': 'json',
- 'status-interval': '600'
+ 'status-interval': '60'
}
self.invoker = None
+ self.remote_host = None
+ self.id = None
def execute(self):
+ if self.invoker is None:
+ raise ValueError("No invoker has been set")
+
args = []
+ self.invoker.remote_host = self.remote_host
+ self.invoker.callback_id = self.fullname
if self.filename.startswith("/dev"):
self.options['size'] = "100%"
@@ -52,3 +59,19 @@ class _base_workload(object):
def setup(self):
pass
+
+ @property
+ def remote_host(self):
+ return str(self._remote_host)
+
+ @remote_host.setter
+ def remote_host(self, value):
+ self._remote_host = value
+
+ @property
+ def fullname(self):
+ return str(self.id) + "." + \
+ self.__class__.__name__ + "." + \
+ str(self.remote_host).replace(".", "-") + \
+ ".queue-depth." + str(self.options['iodepth']) + \
+ ".block-size." + str(self.options['bs'])