From 311eee3bec00d5acc32b6eba76a7ff0d1990f4b2 Mon Sep 17 00:00:00 2001 From: Mark Beierl Date: Mon, 25 Apr 2016 09:55:03 -0400 Subject: Job run lifecycle rework Change the way slave jobs are managed so that they are in step with each other, and we can track the overall thread that is running them. This lays groundwork for STORPERF-20 and STORPERF-44 JIRA: STORPERF-33 STORPERF-43 Change-Id: Iaff48a2823ba85d6512e9782fd9091a72639835c Signed-off-by: Mark Beierl --- ci/verify.sh | 3 + cli.py | 6 +- rest_server.py | 4 +- storperf/carbon/emitter.py | 8 +-- storperf/db/job_db.py | 10 +-- storperf/fio/fio_invoker.py | 11 ++- storperf/storperf_master.py | 3 - storperf/test_executor.py | 84 +++++++++------------- storperf/tests/db_tests/job_db_test.py | 39 +++++----- .../tests/workload_tests/workload_subclass_test.py | 54 ++++++++++++++ storperf/workloads/_base_workload.py | 27 ++++++- 11 files changed, 154 insertions(+), 95 deletions(-) create mode 100644 storperf/tests/workload_tests/workload_subclass_test.py diff --git a/ci/verify.sh b/ci/verify.sh index 2f67e94..70ecb6a 100755 --- a/ci/verify.sh +++ b/ci/verify.sh @@ -19,6 +19,8 @@ virtualenv $WORKSPACE/storperf_venv source $WORKSPACE/storperf_venv/bin/activate pip install setuptools +pip install autoflake=00.6.6 +pip install autopep8==1.2.2 pip install coverage==4.0.3 pip install flask==0.10 pip install flask-restful==0.3.5 @@ -27,6 +29,7 @@ pip install flake8==2.5.4 pip install html2text==2016.1.8 pip install mock==1.3.0 pip install nose==1.3.7 +pip install pysqlite==2.8.2 pip install python-cinderclient==1.6.0 pip install python-glanceclient==1.1.0 pip install python-heatclient==0.8.0 diff --git a/cli.py b/cli.py index 560d77d..5595314 100644 --- a/cli.py +++ b/cli.py @@ -10,7 +10,6 @@ """ from storperf.storperf_master import StorPerfMaster -from storperf.test_executor import UnknownWorkload from threading import Thread import cPickle import getopt @@ -18,13 +17,10 @@ import json import logging import logging.config import logging.handlers -import os import socket import struct import sys -import time -import html2text import requests @@ -49,7 +45,7 @@ class LogRecordStreamHandler(object): while True: datagram = self.socket.recv(8192) chunk = datagram[0:4] - slen = struct.unpack(">L", chunk)[0] + struct.unpack(">L", chunk)[0] chunk = datagram[4:] obj = cPickle.loads(chunk) record = logging.makeLogRecord(obj) diff --git a/rest_server.py b/rest_server.py index 073004a..ffb750e 100644 --- a/rest_server.py +++ b/rest_server.py @@ -62,7 +62,6 @@ class Configure(Resource): storperf.delete_stack() except Exception as e: abort(400, str(e)) - pass class StartJob(Resource): @@ -87,6 +86,9 @@ class StartJob(Resource): storperf.workloads = request.json['workload'] else: storperf.workloads = None + # Add block size, queue depth, number of passes here. + if ('workload' in request.json): + storperf.workloads = request.json['workload'] job_id = storperf.execute_workloads() diff --git a/storperf/carbon/emitter.py b/storperf/carbon/emitter.py index 8a9f734..6104fd4 100644 --- a/storperf/carbon/emitter.py +++ b/storperf/carbon/emitter.py @@ -26,12 +26,12 @@ class CarbonMetricTransmitter(): else: timestamp = str(calendar.timegm(time.gmtime())) - self.carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.carbon_socket.connect((self.carbon_host, self.carbon_port)) + carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + carbon_socket.connect((self.carbon_host, self.carbon_port)) for key, metric in metrics.items(): message = key + " " + metric + " " + timestamp self.logger.debug("Metric: " + message) - self.carbon_socket.send(message + '\n') + carbon_socket.send(message + '\n') - self.carbon_socket.close() + carbon_socket.close() diff --git a/storperf/db/job_db.py b/storperf/db/job_db.py index 57c82cb..8aa4c11 100644 --- a/storperf/db/job_db.py +++ b/storperf/db/job_db.py @@ -76,11 +76,13 @@ class JobDB(object): db.commit() db.close() - def start_workload(self, workload_name): + def start_workload(self, workload): """ Records the start time for the given workload """ + workload_name = workload.fullname + if (self.job_id is None): self.create_job_id() @@ -122,13 +124,15 @@ class JobDB(object): db.commit() db.close() - def end_workload(self, workload_name): + def end_workload(self, workload): """ Records the end time for the given workload """ if (self.job_id is None): self.create_job_id() + workload_name = workload.fullname + with db_mutex: db = sqlite3.connect(JobDB.db_name) @@ -174,8 +178,6 @@ class JobDB(object): workload_prefix = workload_prefix + "%" - stats = () - start_time = str(calendar.timegm(time.gmtime())) end_time = "0" diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py index 5e30a76..fad2546 100644 --- a/storperf/fio/fio_invoker.py +++ b/storperf/fio/fio_invoker.py @@ -8,7 +8,6 @@ ############################################################################## from threading import Thread -import cmd import json import logging import subprocess @@ -48,17 +47,17 @@ class FIOInvoker(object): self.json_body += line try: if line == "}\n": - self.logger.debug( - "Have a json snippet: %s", self.json_body) json_metric = json.loads(self.json_body) self.json_body = "" for event_listener in self.event_listeners: - event_listener(self.callback_id, json_metric) - + try: + event_listener(self.callback_id, json_metric) + except Exception, e: + self.logger.error("Notifying listener %s: %s", + self.callback_id, e) except Exception, e: self.logger.error("Error parsing JSON: %s", e) - pass except ValueError: pass # We might have read from the closed socket, ignore it diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py index a467aef..b4fef7f 100644 --- a/storperf/storperf_master.py +++ b/storperf/storperf_master.py @@ -202,7 +202,6 @@ class StorPerfMaster(object): parameters=self._make_parameters()) self.stack_id = stack['stack']['id'] - pass def validate_stack(self): self._attach_to_openstack() @@ -232,8 +231,6 @@ class StorPerfMaster(object): self._heat_client.stacks.delete(stack_id=self.stack_id) self.stack_id = None - pass - def execute_workloads(self): if (self.stack_id is None): diff --git a/storperf/test_executor.py b/storperf/test_executor.py index 734b514..497d17c 100644 --- a/storperf/test_executor.py +++ b/storperf/test_executor.py @@ -14,6 +14,7 @@ from storperf.carbon.emitter import CarbonMetricTransmitter from storperf.db.job_db import JobDB from storperf.fio.fio_invoker import FIOInvoker from threading import Thread +import copy import imp import logging import os @@ -37,6 +38,7 @@ class TestExecutor(object): self.prefix = None self.job_db = JobDB() self._slaves = [] + self._workload_thread = None @property def slaves(self): @@ -58,22 +60,9 @@ class TestExecutor(object): metric, callback_id) - read_latency = carbon_metrics[callback_id + ".jobs.1.read.lat.mean"] - write_latency = carbon_metrics[callback_id + ".jobs.1.write.lat.mean"] - read_iops = carbon_metrics[callback_id + ".jobs.1.read.iops"] - write_iops = carbon_metrics[callback_id + ".jobs.1.write.iops"] - - message = "Average Latency us Read/Write: " + read_latency \ - + "/" + write_latency + " IOPS r/w: " + \ - read_iops + "/" + write_iops - - for event_listener in self.event_listeners: - event_listener(message) - self.metrics_emitter.transmit_metrics(carbon_metrics) def register_workloads(self, workloads): - if (workloads is None or len(workloads) == 0): workload_dir = os.path.normpath( os.path.join(os.path.dirname(__file__), "workloads")) @@ -124,66 +113,59 @@ class TestExecutor(object): return None def execute(self): - self.job_db.create_job_id() - for slave in self.slaves: - t = Thread(target=self.execute_on_node, args=(slave,)) - t.daemon = False - t.start() - + self._workload_thread = Thread(target=self.execute_workloads, args=()) + self._workload_thread.start() return self.job_db.job_id - def execute_on_node(self, remote_host): - - logger = logging.getLogger(__name__ + ":" + remote_host) - - invoker = FIOInvoker() - invoker.remote_host = remote_host - invoker.register(self.event) - - logger.info( - "Starting job " + self.job_db.job_id + " on " + remote_host) - + def execute_workloads(self): for workload_module in self.workload_modules: - workload_name = getattr(workload_module, "__name__") constructorMethod = getattr(workload_module, workload_name) - logger.debug( - "Found workload: " + str(constructorMethod)) workload = constructorMethod() if (self.filename is not None): workload.filename = self.filename - workload.invoker = invoker if (workload_name.startswith("_")): iodepths = [2, ] - blocksizes = [65536, ] + blocksizes = [8192, ] else: iodepths = [1, 16, 128] - blocksizes = [4096, 65536, 1048576] + blocksizes = [8192, 4096, 512] + + workload.id = self.job_db.job_id for blocksize in blocksizes: for iodepth in iodepths: - - full_workload_name = workload_name + \ - ".host." + remote_host + \ - ".queue-depth." + str(iodepth) + \ - ".block-size." + str(blocksize) - workload.options['iodepth'] = str(iodepth) workload.options['bs'] = str(blocksize) - self.logger.info( - "Executing workload: " + full_workload_name) - invoker.callback_id = self.job_db.job_id + \ - "." + full_workload_name + slave_threads = [] + for slave in self.slaves: + slave_workload = copy.copy(workload) + slave_workload.remote_host = slave + t = Thread(target=self.execute_on_node, + args=(slave_workload,)) + t.daemon = False + t.start() + slave_threads.append(t) + + for slave_thread in slave_threads: + slave_thread.join() + + def execute_on_node(self, workload): + + invoker = FIOInvoker() + invoker.register(self.event) + workload.invoker = invoker + + self.logger.info("Starting " + workload.fullname) - self.job_db.start_workload(full_workload_name) - workload.execute() - self.job_db.end_workload(full_workload_name) + self.job_db.start_workload(workload) + workload.execute() + self.job_db.end_workload(workload) - logger.info( - "Finished job " + self.job_db.job_id + " on " + remote_host) + self.logger.info("Ended " + workload.fullname) def fetch_results(self, job, workload_name=""): self.job_db.job_id = job diff --git a/storperf/tests/db_tests/job_db_test.py b/storperf/tests/db_tests/job_db_test.py index 4620412..92b1482 100644 --- a/storperf/tests/db_tests/job_db_test.py +++ b/storperf/tests/db_tests/job_db_test.py @@ -8,6 +8,7 @@ ############################################################################## from storperf.db.job_db import JobDB +from storperf.workloads.rr import rr import os import sqlite3 import unittest @@ -60,7 +61,7 @@ class JobDBTest(unittest.TestCase): start_time = "12345" mock_calendar.side_effect = (start_time,) mock_uuid.side_effect = (job_id,) - workload_name = "Workload" + workload = rr() db = sqlite3.connect(JobDB.db_name) cursor = db.cursor() @@ -69,26 +70,26 @@ class JobDBTest(unittest.TestCase): """select * from jobs where job_id = ? and workload = ?""", - (job_id, workload_name,)) + (job_id, workload.fullname,)) self.assertEqual(None, row.fetchone(), "Should not have been a row in the db") - self.job.start_workload(workload_name) + self.job.start_workload(workload) cursor.execute( """select job_id, workload, start from jobs where job_id = ? and workload = ?""", - (job_id, workload_name,)) + (job_id, workload.fullname,)) row = cursor.fetchone() self.assertNotEqual(None, row, "Should be a row in the db") self.assertEqual(job_id, row[0], "Did not expect " + str(row[0])) self.assertEqual( - workload_name, row[1], "Did not expect " + str(row[1])) + workload.fullname, row[1], "Did not expect " + str(row[1])) self.assertEqual(start_time, row[2], "Did not expect " + str(row[2])) @mock.patch("uuid.uuid4") @@ -99,10 +100,10 @@ class JobDBTest(unittest.TestCase): end_time = "54321" mock_calendar.side_effect = (start_time, end_time,) mock_uuid.side_effect = (job_id,) - workload_name = "Workload" + workload = rr() - self.job.start_workload(workload_name) - self.job.end_workload(workload_name) + self.job.start_workload(workload) + self.job.end_workload(workload) db = sqlite3.connect(JobDB.db_name) cursor = db.cursor() @@ -110,14 +111,14 @@ class JobDBTest(unittest.TestCase): """select job_id, workload, start, end from jobs where job_id = ? and workload = ?""", - (job_id, workload_name,)) + (job_id, workload.fullname,)) row = cursor.fetchone() self.assertNotEqual(None, row, "Should be a row in the db") self.assertEqual(job_id, row[0], "Did not expect " + str(row[0])) self.assertEqual( - workload_name, row[1], "Did not expect " + str(row[1])) + workload.fullname, row[1], "Did not expect " + str(row[1])) self.assertEqual(start_time, row[2], "Did not expect " + str(row[2])) self.assertEqual(end_time, row[3], "Did not expect " + str(row[3])) @@ -130,26 +131,26 @@ class JobDBTest(unittest.TestCase): mock_calendar.side_effect = (start_time_1, start_time_2) mock_uuid.side_effect = (job_id,) - workload_name = "Workload" + workload = rr() db = sqlite3.connect(JobDB.db_name) cursor = db.cursor() - self.job.start_workload(workload_name) - self.job.start_workload(workload_name) + self.job.start_workload(workload) + self.job.start_workload(workload) cursor.execute( """select job_id, workload, start from jobs where job_id = ? and workload = ?""", - (job_id, workload_name,)) + (job_id, workload.fullname,)) row = cursor.fetchone() self.assertNotEqual(None, row, "Should be a row in the db") self.assertEqual(job_id, row[0], "Did not expect " + str(row[0])) self.assertEqual( - workload_name, row[1], "Did not expect " + str(row[1])) + workload.fullname, row[1], "Did not expect " + str(row[1])) self.assertEqual(start_time_2, row[2], "Did not expect " + str(row[2])) @mock.patch("uuid.uuid4") @@ -160,9 +161,9 @@ class JobDBTest(unittest.TestCase): end_time = "54321" mock_calendar.side_effect = (start_time, end_time,) mock_uuid.side_effect = (job_id,) - workload_name = "Workload" + workload = rr() - self.job.end_workload(workload_name) + self.job.end_workload(workload) db = sqlite3.connect(JobDB.db_name) cursor = db.cursor() @@ -170,14 +171,14 @@ class JobDBTest(unittest.TestCase): """select job_id, workload, start, end from jobs where job_id = ? and workload = ?""", - (job_id, workload_name,)) + (job_id, workload.fullname,)) row = cursor.fetchone() self.assertNotEqual(None, row, "Should be a row in the db") self.assertEqual(job_id, row[0], "Did not expect " + str(row[0])) self.assertEqual( - workload_name, row[1], "Did not expect " + str(row[1])) + workload.fullname, row[1], "Did not expect " + str(row[1])) # The start time is set to the same time as end if it was never set # before self.assertEqual(start_time, row[2], "Did not expect " + str(row[2])) diff --git a/storperf/tests/workload_tests/workload_subclass_test.py b/storperf/tests/workload_tests/workload_subclass_test.py new file mode 100644 index 0000000..97b6b46 --- /dev/null +++ b/storperf/tests/workload_tests/workload_subclass_test.py @@ -0,0 +1,54 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import unittest +from storperf.workloads.rr import rr +from storperf.workloads.rs import rs +from storperf.workloads.rw import rw +from storperf.workloads.wr import wr +from storperf.workloads.ws import ws + + +class WorkloadSubclassTest(unittest.TestCase): + + def setUp(self): + pass + + def test_local_name(self): + workload = rr() + self.assertEqual(workload.fullname, + "None.rr.None.queue-depth.1.block-size.64k", + workload.fullname) + + def test_remote_name(self): + workload = rw() + workload.remote_host = "192.168.0.1" + self.assertEqual(workload.fullname, + "None.rw.192-168-0-1.queue-depth.1.block-size.64k", + workload.fullname) + + def test_blocksize(self): + workload = rs() + workload.options["bs"] = "4k" + self.assertEqual(workload.fullname, + "None.rs.None.queue-depth.1.block-size.4k", + workload.fullname) + + def test_queue_depth(self): + workload = wr() + workload.options["iodepth"] = "8" + self.assertEqual(workload.fullname, + "None.wr.None.queue-depth.8.block-size.64k", + workload.fullname) + + def test_id(self): + workload = ws() + workload.id = "workloadid" + self.assertEqual(workload.fullname, + "workloadid.ws.None.queue-depth.1.block-size.64k", + workload.fullname) diff --git a/storperf/workloads/_base_workload.py b/storperf/workloads/_base_workload.py index f7c14ad..4eccc08 100644 --- a/storperf/workloads/_base_workload.py +++ b/storperf/workloads/_base_workload.py @@ -13,7 +13,7 @@ import logging class _base_workload(object): def __init__(self): - self.logger = logging.getLogger(__name__) + self.logger = logging.getLogger(self.__class__.__name__) self.default_filesize = "100%" self.filename = '/dev/vdb' self.options = { @@ -25,12 +25,19 @@ class _base_workload(object): 'numjobs': '1', 'loops': '2', 'output-format': 'json', - 'status-interval': '600' + 'status-interval': '60' } self.invoker = None + self.remote_host = None + self.id = None def execute(self): + if self.invoker is None: + raise ValueError("No invoker has been set") + args = [] + self.invoker.remote_host = self.remote_host + self.invoker.callback_id = self.fullname if self.filename.startswith("/dev"): self.options['size'] = "100%" @@ -52,3 +59,19 @@ class _base_workload(object): def setup(self): pass + + @property + def remote_host(self): + return str(self._remote_host) + + @remote_host.setter + def remote_host(self, value): + self._remote_host = value + + @property + def fullname(self): + return str(self.id) + "." + \ + self.__class__.__name__ + "." + \ + str(self.remote_host).replace(".", "-") + \ + ".queue-depth." + str(self.options['iodepth']) + \ + ".block-size." + str(self.options['bs']) -- cgit 1.2.3-korg