diff options
-rwxr-xr-x | ci/verify.sh | 3 | ||||
-rw-r--r-- | cli.py | 6 | ||||
-rw-r--r-- | rest_server.py | 4 | ||||
-rw-r--r-- | storperf/carbon/emitter.py | 8 | ||||
-rw-r--r-- | storperf/db/job_db.py | 10 | ||||
-rw-r--r-- | storperf/fio/fio_invoker.py | 11 | ||||
-rw-r--r-- | storperf/storperf_master.py | 3 | ||||
-rw-r--r-- | storperf/test_executor.py | 84 | ||||
-rw-r--r-- | storperf/tests/db_tests/job_db_test.py | 39 | ||||
-rw-r--r-- | storperf/tests/workload_tests/workload_subclass_test.py | 54 | ||||
-rw-r--r-- | storperf/workloads/_base_workload.py | 27 |
11 files changed, 154 insertions, 95 deletions
diff --git a/ci/verify.sh b/ci/verify.sh index 2f67e94..70ecb6a 100755 --- a/ci/verify.sh +++ b/ci/verify.sh @@ -19,6 +19,8 @@ virtualenv $WORKSPACE/storperf_venv source $WORKSPACE/storperf_venv/bin/activate pip install setuptools +pip install autoflake=00.6.6 +pip install autopep8==1.2.2 pip install coverage==4.0.3 pip install flask==0.10 pip install flask-restful==0.3.5 @@ -27,6 +29,7 @@ pip install flake8==2.5.4 pip install html2text==2016.1.8 pip install mock==1.3.0 pip install nose==1.3.7 +pip install pysqlite==2.8.2 pip install python-cinderclient==1.6.0 pip install python-glanceclient==1.1.0 pip install python-heatclient==0.8.0 @@ -10,7 +10,6 @@ """ from storperf.storperf_master import StorPerfMaster -from storperf.test_executor import UnknownWorkload from threading import Thread import cPickle import getopt @@ -18,13 +17,10 @@ import json import logging import logging.config import logging.handlers -import os import socket import struct import sys -import time -import html2text import requests @@ -49,7 +45,7 @@ class LogRecordStreamHandler(object): while True: datagram = self.socket.recv(8192) chunk = datagram[0:4] - slen = struct.unpack(">L", chunk)[0] + struct.unpack(">L", chunk)[0] chunk = datagram[4:] obj = cPickle.loads(chunk) record = logging.makeLogRecord(obj) diff --git a/rest_server.py b/rest_server.py index 073004a..ffb750e 100644 --- a/rest_server.py +++ b/rest_server.py @@ -62,7 +62,6 @@ class Configure(Resource): storperf.delete_stack() except Exception as e: abort(400, str(e)) - pass class StartJob(Resource): @@ -87,6 +86,9 @@ class StartJob(Resource): storperf.workloads = request.json['workload'] else: storperf.workloads = None + # Add block size, queue depth, number of passes here. + if ('workload' in request.json): + storperf.workloads = request.json['workload'] job_id = storperf.execute_workloads() diff --git a/storperf/carbon/emitter.py b/storperf/carbon/emitter.py index 8a9f734..6104fd4 100644 --- a/storperf/carbon/emitter.py +++ b/storperf/carbon/emitter.py @@ -26,12 +26,12 @@ class CarbonMetricTransmitter(): else: timestamp = str(calendar.timegm(time.gmtime())) - self.carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.carbon_socket.connect((self.carbon_host, self.carbon_port)) + carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + carbon_socket.connect((self.carbon_host, self.carbon_port)) for key, metric in metrics.items(): message = key + " " + metric + " " + timestamp self.logger.debug("Metric: " + message) - self.carbon_socket.send(message + '\n') + carbon_socket.send(message + '\n') - self.carbon_socket.close() + carbon_socket.close() diff --git a/storperf/db/job_db.py b/storperf/db/job_db.py index 57c82cb..8aa4c11 100644 --- a/storperf/db/job_db.py +++ b/storperf/db/job_db.py @@ -76,11 +76,13 @@ class JobDB(object): db.commit() db.close() - def start_workload(self, workload_name): + def start_workload(self, workload): """ Records the start time for the given workload """ + workload_name = workload.fullname + if (self.job_id is None): self.create_job_id() @@ -122,13 +124,15 @@ class JobDB(object): db.commit() db.close() - def end_workload(self, workload_name): + def end_workload(self, workload): """ Records the end time for the given workload """ if (self.job_id is None): self.create_job_id() + workload_name = workload.fullname + with db_mutex: db = sqlite3.connect(JobDB.db_name) @@ -174,8 +178,6 @@ class JobDB(object): workload_prefix = workload_prefix + "%" - stats = () - start_time = str(calendar.timegm(time.gmtime())) end_time = "0" diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py index 5e30a76..fad2546 100644 --- a/storperf/fio/fio_invoker.py +++ b/storperf/fio/fio_invoker.py @@ -8,7 +8,6 @@ ############################################################################## from threading import Thread -import cmd import json import logging import subprocess @@ -48,17 +47,17 @@ class FIOInvoker(object): self.json_body += line try: if line == "}\n": - self.logger.debug( - "Have a json snippet: %s", self.json_body) json_metric = json.loads(self.json_body) self.json_body = "" for event_listener in self.event_listeners: - event_listener(self.callback_id, json_metric) - + try: + event_listener(self.callback_id, json_metric) + except Exception, e: + self.logger.error("Notifying listener %s: %s", + self.callback_id, e) except Exception, e: self.logger.error("Error parsing JSON: %s", e) - pass except ValueError: pass # We might have read from the closed socket, ignore it diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py index a467aef..b4fef7f 100644 --- a/storperf/storperf_master.py +++ b/storperf/storperf_master.py @@ -202,7 +202,6 @@ class StorPerfMaster(object): parameters=self._make_parameters()) self.stack_id = stack['stack']['id'] - pass def validate_stack(self): self._attach_to_openstack() @@ -232,8 +231,6 @@ class StorPerfMaster(object): self._heat_client.stacks.delete(stack_id=self.stack_id) self.stack_id = None - pass - def execute_workloads(self): if (self.stack_id is None): diff --git a/storperf/test_executor.py b/storperf/test_executor.py index 734b514..497d17c 100644 --- a/storperf/test_executor.py +++ b/storperf/test_executor.py @@ -14,6 +14,7 @@ from storperf.carbon.emitter import CarbonMetricTransmitter from storperf.db.job_db import JobDB from storperf.fio.fio_invoker import FIOInvoker from threading import Thread +import copy import imp import logging import os @@ -37,6 +38,7 @@ class TestExecutor(object): self.prefix = None self.job_db = JobDB() self._slaves = [] + self._workload_thread = None @property def slaves(self): @@ -58,22 +60,9 @@ class TestExecutor(object): metric, callback_id) - read_latency = carbon_metrics[callback_id + ".jobs.1.read.lat.mean"] - write_latency = carbon_metrics[callback_id + ".jobs.1.write.lat.mean"] - read_iops = carbon_metrics[callback_id + ".jobs.1.read.iops"] - write_iops = carbon_metrics[callback_id + ".jobs.1.write.iops"] - - message = "Average Latency us Read/Write: " + read_latency \ - + "/" + write_latency + " IOPS r/w: " + \ - read_iops + "/" + write_iops - - for event_listener in self.event_listeners: - event_listener(message) - self.metrics_emitter.transmit_metrics(carbon_metrics) def register_workloads(self, workloads): - if (workloads is None or len(workloads) == 0): workload_dir = os.path.normpath( os.path.join(os.path.dirname(__file__), "workloads")) @@ -124,66 +113,59 @@ class TestExecutor(object): return None def execute(self): - self.job_db.create_job_id() - for slave in self.slaves: - t = Thread(target=self.execute_on_node, args=(slave,)) - t.daemon = False - t.start() - + self._workload_thread = Thread(target=self.execute_workloads, args=()) + self._workload_thread.start() return self.job_db.job_id - def execute_on_node(self, remote_host): - - logger = logging.getLogger(__name__ + ":" + remote_host) - - invoker = FIOInvoker() - invoker.remote_host = remote_host - invoker.register(self.event) - - logger.info( - "Starting job " + self.job_db.job_id + " on " + remote_host) - + def execute_workloads(self): for workload_module in self.workload_modules: - workload_name = getattr(workload_module, "__name__") constructorMethod = getattr(workload_module, workload_name) - logger.debug( - "Found workload: " + str(constructorMethod)) workload = constructorMethod() if (self.filename is not None): workload.filename = self.filename - workload.invoker = invoker if (workload_name.startswith("_")): iodepths = [2, ] - blocksizes = [65536, ] + blocksizes = [8192, ] else: iodepths = [1, 16, 128] - blocksizes = [4096, 65536, 1048576] + blocksizes = [8192, 4096, 512] + + workload.id = self.job_db.job_id for blocksize in blocksizes: for iodepth in iodepths: - - full_workload_name = workload_name + \ - ".host." + remote_host + \ - ".queue-depth." + str(iodepth) + \ - ".block-size." + str(blocksize) - workload.options['iodepth'] = str(iodepth) workload.options['bs'] = str(blocksize) - self.logger.info( - "Executing workload: " + full_workload_name) - invoker.callback_id = self.job_db.job_id + \ - "." + full_workload_name + slave_threads = [] + for slave in self.slaves: + slave_workload = copy.copy(workload) + slave_workload.remote_host = slave + t = Thread(target=self.execute_on_node, + args=(slave_workload,)) + t.daemon = False + t.start() + slave_threads.append(t) + + for slave_thread in slave_threads: + slave_thread.join() + + def execute_on_node(self, workload): + + invoker = FIOInvoker() + invoker.register(self.event) + workload.invoker = invoker + + self.logger.info("Starting " + workload.fullname) - self.job_db.start_workload(full_workload_name) - workload.execute() - self.job_db.end_workload(full_workload_name) + self.job_db.start_workload(workload) + workload.execute() + self.job_db.end_workload(workload) - logger.info( - "Finished job " + self.job_db.job_id + " on " + remote_host) + self.logger.info("Ended " + workload.fullname) def fetch_results(self, job, workload_name=""): self.job_db.job_id = job diff --git a/storperf/tests/db_tests/job_db_test.py b/storperf/tests/db_tests/job_db_test.py index 4620412..92b1482 100644 --- a/storperf/tests/db_tests/job_db_test.py +++ b/storperf/tests/db_tests/job_db_test.py @@ -8,6 +8,7 @@ ############################################################################## from storperf.db.job_db import JobDB +from storperf.workloads.rr import rr import os import sqlite3 import unittest @@ -60,7 +61,7 @@ class JobDBTest(unittest.TestCase): start_time = "12345" mock_calendar.side_effect = (start_time,) mock_uuid.side_effect = (job_id,) - workload_name = "Workload" + workload = rr() db = sqlite3.connect(JobDB.db_name) cursor = db.cursor() @@ -69,26 +70,26 @@ class JobDBTest(unittest.TestCase): """select * from jobs where job_id = ? and workload = ?""", - (job_id, workload_name,)) + (job_id, workload.fullname,)) self.assertEqual(None, row.fetchone(), "Should not have been a row in the db") - self.job.start_workload(workload_name) + self.job.start_workload(workload) cursor.execute( """select job_id, workload, start from jobs where job_id = ? and workload = ?""", - (job_id, workload_name,)) + (job_id, workload.fullname,)) row = cursor.fetchone() self.assertNotEqual(None, row, "Should be a row in the db") self.assertEqual(job_id, row[0], "Did not expect " + str(row[0])) self.assertEqual( - workload_name, row[1], "Did not expect " + str(row[1])) + workload.fullname, row[1], "Did not expect " + str(row[1])) self.assertEqual(start_time, row[2], "Did not expect " + str(row[2])) @mock.patch("uuid.uuid4") @@ -99,10 +100,10 @@ class JobDBTest(unittest.TestCase): end_time = "54321" mock_calendar.side_effect = (start_time, end_time,) mock_uuid.side_effect = (job_id,) - workload_name = "Workload" + workload = rr() - self.job.start_workload(workload_name) - self.job.end_workload(workload_name) + self.job.start_workload(workload) + self.job.end_workload(workload) db = sqlite3.connect(JobDB.db_name) cursor = db.cursor() @@ -110,14 +111,14 @@ class JobDBTest(unittest.TestCase): """select job_id, workload, start, end from jobs where job_id = ? and workload = ?""", - (job_id, workload_name,)) + (job_id, workload.fullname,)) row = cursor.fetchone() self.assertNotEqual(None, row, "Should be a row in the db") self.assertEqual(job_id, row[0], "Did not expect " + str(row[0])) self.assertEqual( - workload_name, row[1], "Did not expect " + str(row[1])) + workload.fullname, row[1], "Did not expect " + str(row[1])) self.assertEqual(start_time, row[2], "Did not expect " + str(row[2])) self.assertEqual(end_time, row[3], "Did not expect " + str(row[3])) @@ -130,26 +131,26 @@ class JobDBTest(unittest.TestCase): mock_calendar.side_effect = (start_time_1, start_time_2) mock_uuid.side_effect = (job_id,) - workload_name = "Workload" + workload = rr() db = sqlite3.connect(JobDB.db_name) cursor = db.cursor() - self.job.start_workload(workload_name) - self.job.start_workload(workload_name) + self.job.start_workload(workload) + self.job.start_workload(workload) cursor.execute( """select job_id, workload, start from jobs where job_id = ? and workload = ?""", - (job_id, workload_name,)) + (job_id, workload.fullname,)) row = cursor.fetchone() self.assertNotEqual(None, row, "Should be a row in the db") self.assertEqual(job_id, row[0], "Did not expect " + str(row[0])) self.assertEqual( - workload_name, row[1], "Did not expect " + str(row[1])) + workload.fullname, row[1], "Did not expect " + str(row[1])) self.assertEqual(start_time_2, row[2], "Did not expect " + str(row[2])) @mock.patch("uuid.uuid4") @@ -160,9 +161,9 @@ class JobDBTest(unittest.TestCase): end_time = "54321" mock_calendar.side_effect = (start_time, end_time,) mock_uuid.side_effect = (job_id,) - workload_name = "Workload" + workload = rr() - self.job.end_workload(workload_name) + self.job.end_workload(workload) db = sqlite3.connect(JobDB.db_name) cursor = db.cursor() @@ -170,14 +171,14 @@ class JobDBTest(unittest.TestCase): """select job_id, workload, start, end from jobs where job_id = ? and workload = ?""", - (job_id, workload_name,)) + (job_id, workload.fullname,)) row = cursor.fetchone() self.assertNotEqual(None, row, "Should be a row in the db") self.assertEqual(job_id, row[0], "Did not expect " + str(row[0])) self.assertEqual( - workload_name, row[1], "Did not expect " + str(row[1])) + workload.fullname, row[1], "Did not expect " + str(row[1])) # The start time is set to the same time as end if it was never set # before self.assertEqual(start_time, row[2], "Did not expect " + str(row[2])) diff --git a/storperf/tests/workload_tests/workload_subclass_test.py b/storperf/tests/workload_tests/workload_subclass_test.py new file mode 100644 index 0000000..97b6b46 --- /dev/null +++ b/storperf/tests/workload_tests/workload_subclass_test.py @@ -0,0 +1,54 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import unittest +from storperf.workloads.rr import rr +from storperf.workloads.rs import rs +from storperf.workloads.rw import rw +from storperf.workloads.wr import wr +from storperf.workloads.ws import ws + + +class WorkloadSubclassTest(unittest.TestCase): + + def setUp(self): + pass + + def test_local_name(self): + workload = rr() + self.assertEqual(workload.fullname, + "None.rr.None.queue-depth.1.block-size.64k", + workload.fullname) + + def test_remote_name(self): + workload = rw() + workload.remote_host = "192.168.0.1" + self.assertEqual(workload.fullname, + "None.rw.192-168-0-1.queue-depth.1.block-size.64k", + workload.fullname) + + def test_blocksize(self): + workload = rs() + workload.options["bs"] = "4k" + self.assertEqual(workload.fullname, + "None.rs.None.queue-depth.1.block-size.4k", + workload.fullname) + + def test_queue_depth(self): + workload = wr() + workload.options["iodepth"] = "8" + self.assertEqual(workload.fullname, + "None.wr.None.queue-depth.8.block-size.64k", + workload.fullname) + + def test_id(self): + workload = ws() + workload.id = "workloadid" + self.assertEqual(workload.fullname, + "workloadid.ws.None.queue-depth.1.block-size.64k", + workload.fullname) diff --git a/storperf/workloads/_base_workload.py b/storperf/workloads/_base_workload.py index f7c14ad..4eccc08 100644 --- a/storperf/workloads/_base_workload.py +++ b/storperf/workloads/_base_workload.py @@ -13,7 +13,7 @@ import logging class _base_workload(object): def __init__(self): - self.logger = logging.getLogger(__name__) + self.logger = logging.getLogger(self.__class__.__name__) self.default_filesize = "100%" self.filename = '/dev/vdb' self.options = { @@ -25,12 +25,19 @@ class _base_workload(object): 'numjobs': '1', 'loops': '2', 'output-format': 'json', - 'status-interval': '600' + 'status-interval': '60' } self.invoker = None + self.remote_host = None + self.id = None def execute(self): + if self.invoker is None: + raise ValueError("No invoker has been set") + args = [] + self.invoker.remote_host = self.remote_host + self.invoker.callback_id = self.fullname if self.filename.startswith("/dev"): self.options['size'] = "100%" @@ -52,3 +59,19 @@ class _base_workload(object): def setup(self): pass + + @property + def remote_host(self): + return str(self._remote_host) + + @remote_host.setter + def remote_host(self, value): + self._remote_host = value + + @property + def fullname(self): + return str(self.id) + "." + \ + self.__class__.__name__ + "." + \ + str(self.remote_host).replace(".", "-") + \ + ".queue-depth." + str(self.options['iodepth']) + \ + ".block-size." + str(self.options['bs']) |