From f099c8aaa7aeae805f7534382bfef789894abffb Mon Sep 17 00:00:00 2001 From: mbeierl Date: Fri, 11 Dec 2015 15:31:17 -0500 Subject: Workload reporting Use a local db to track start/end times of runs so we can go back to the carbon db and summarize values at reporting time based off the raw data. Change-Id: Ie62afd339fd1c15d82bc56c93c7cba5bd4f90fe2 JIRA: STORPERF-29 Signed-off-by: mbeierl --- storperf/carbon/emitter.py | 3 +- storperf/db/__init__.py | 0 storperf/db/job_db.py | 197 +++++++++++++++++++++++++++++ storperf/fio/fio_invoker.py | 46 +++---- storperf/main.py | 20 ++- storperf/test_executor.py | 82 ++++++++---- storperf/tests/db_tests/job_db_test.py | 174 +++++++++++++++++++++++++ storperf/workloads/_ssd_preconditioning.py | 3 +- storperf/workloads/_warm_up.py | 6 +- 9 files changed, 474 insertions(+), 57 deletions(-) create mode 100644 storperf/db/__init__.py create mode 100644 storperf/db/job_db.py create mode 100644 storperf/tests/db_tests/job_db_test.py diff --git a/storperf/carbon/emitter.py b/storperf/carbon/emitter.py index e949238..8a9f734 100644 --- a/storperf/carbon/emitter.py +++ b/storperf/carbon/emitter.py @@ -6,9 +6,8 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -import logging - import calendar +import logging import socket import time diff --git a/storperf/db/__init__.py b/storperf/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/storperf/db/job_db.py b/storperf/db/job_db.py new file mode 100644 index 0000000..a65fa78 --- /dev/null +++ b/storperf/db/job_db.py @@ -0,0 +1,197 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from _sqlite3 import OperationalError +import calendar +import logging +import sqlite3 +import time +import uuid + +import requests + + +class JobDB(object): + + db_name = "StorPerf.db" + + def __init__(self): + """ + Creates the StorPerf.db and jobs tables on demand + """ + + self.logger = logging.getLogger(__name__) + self.logger.debug("Connecting to " + JobDB.db_name) + self.db = sqlite3.connect(JobDB.db_name) + self.job_id = None + + cursor = self.db.cursor() + try: + cursor.execute('''CREATE TABLE jobs + (job_id text, + workload text, + start text, + end text)''') + self.logger.debug("Created job table") + except OperationalError: + self.logger.debug("Job table exists") + + cursor.execute('SELECT * FROM jobs') + + def create_job_id(self): + """ + Returns a job id that is guaranteed to be unique in this + StorPerf instance. + """ + cursor = self.db.cursor() + + self.job_id = str(uuid.uuid4()) + row = cursor.execute( + "select * from jobs where job_id = ?", (self.job_id,)) + + while (row.fetchone() is not None): + self.logger.info("Duplicate job id found, regenerating") + self.job_id = str(uuid.uuid4()) + row = cursor.execute( + "select * from jobs where job_id = ?", (self.job_id,)) + + cursor.execute( + "insert into jobs(job_id) values (?)", (self.job_id,)) + self.logger.debug("Reserved job id " + self.job_id) + self.db.commit() + + def start_workload(self, workload_name): + """ + Records the start time for the given workload + """ + if (self.job_id is None): + self.create_job_id() + + cursor = self.db.cursor() + now = str(calendar.timegm(time.gmtime())) + + row = cursor.execute( + """select * from jobs + where job_id = ? + and workload = ?""", + (self.job_id, workload_name,)) + + if (row.fetchone() is None): + cursor.execute( + """insert into jobs + (job_id, + workload, + start) + values (?, ?, ?)""", + (self.job_id, + workload_name, + now,)) + else: + self.logger.warn("Duplicate start time for workload " + + workload_name) + cursor.execute( + """update jobs set + job_id = ?, + start = ? + where workload = ?""", + (self.job_id, + now, + workload_name,)) + + self.db.commit() + + def end_workload(self, workload_name): + """ + Records the end time for the given workload + """ + if (self.job_id is None): + self.create_job_id() + + cursor = self.db.cursor() + now = str(calendar.timegm(time.gmtime())) + + row = cursor.execute( + """select * from jobs + where job_id = ? + and workload = ?""", + (self.job_id, workload_name,)) + + if (row.fetchone() is None): + self.logger.warn("No start time recorded for workload " + + workload_name) + cursor.execute( + """insert into jobs + (job_id, + workload, + start, + end) + values (?, ?, ?, ?)""", + (self.job_id, + workload_name, + now, + now)) + else: + cursor.execute( + """update jobs set + job_id = ?, + end = ? + where workload = ?""", + (self.job_id, + now, + workload_name,)) + + self.db.commit() + + def fetch_results(self, workload_prefix=""): + if (workload_prefix is None): + workload_prefix = "" + + workload_prefix = workload_prefix + "%" + + stats = () + + start_time = str(calendar.timegm(time.gmtime())) + end_time = "0" + + self.logger.debug("Workload like: " + workload_prefix) + + cursor = self.db.cursor() + cursor.execute("""select start, end, workload + from jobs where workload like ?""", + (workload_prefix,)) + + while (True): + row = cursor.fetchone() + if (row is None): + break + + start_time = str(row[0]) + end_time = str(row[1]) + workload = str(row[2]) + + # for most of these stats, we just want the final one + # as that is cumulative average or whatever for the whole + # run + + self.logger.info("workload=" + workload + + "start=" + start_time + " end=" + end_time) + + request = 'http://127.0.0.1:8000/render/?target=*.' + self.job_id + \ + '.' + workload + '.jobs.1.*.clat.mean&format=json&from=' + \ + start_time + "&until=" + end_time + + print '\n\t' + request + '\n' + + response = requests.get(request) + + if (response.status_code == 200): + data = response.json() + print data + else: + pass diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py index be1b37c..0b13349 100644 --- a/storperf/fio/fio_invoker.py +++ b/storperf/fio/fio_invoker.py @@ -7,11 +7,10 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -import subprocess import json -from threading import Thread - import logging +import subprocess +from threading import Thread class FIOInvoker(object): @@ -28,30 +27,33 @@ class FIOInvoker(object): def stdout_handler(self): self.json_body = "" - for line in iter(self.fio_process.stdout.readline, b''): - if line.startswith("fio"): - line = "" - continue - self.json_body += line - try: - if line == "}\n": - self.logger.debug( - "Have a json snippet: %s", self.json_body) - json_metric = json.loads(self.json_body) - self.json_body = "" - - for event_listener in self.event_listeners: - event_listener(json_metric) - - except Exception, e: - self.logger.error("Error parsing JSON: %s", e) - pass + try: + for line in iter(self.fio_process.stdout.readline, b''): + if line.startswith("fio"): + line = "" + continue + self.json_body += line + try: + if line == "}\n": + self.logger.debug( + "Have a json snippet: %s", self.json_body) + json_metric = json.loads(self.json_body) + self.json_body = "" + + for event_listener in self.event_listeners: + event_listener(json_metric) + + except Exception, e: + self.logger.error("Error parsing JSON: %s", e) + pass + except ValueError: + pass # We might have read from the closed socket, ignore it self.fio_process.stdout.close() def stderr_handler(self): for line in iter(self.fio_process.stderr.readline, b''): - print line + self.logger.error("FIO Error: %s", line) self.fio_process.stderr.close() diff --git a/storperf/main.py b/storperf/main.py index 2423b99..11357f4 100644 --- a/storperf/main.py +++ b/storperf/main.py @@ -52,18 +52,22 @@ def main(argv=None): setup_logging() test_executor = TestExecutor() verbose = False + debug = False workloads = None + report = None if argv is None: argv = sys.argv try: try: - opts, args = getopt.getopt(argv[1:], "t:w:scvh", + opts, args = getopt.getopt(argv[1:], "t:w:r:scvdh", ["target=", "workload=", + "report=", "nossd", "nowarm", "verbose", + "debug", "help", ]) except getopt.error, msg: @@ -75,14 +79,23 @@ def main(argv=None): return 0 elif o in ("-t", "--target"): test_executor.filename = a + elif o in ("-t", "--target"): + report = a elif o in ("-v", "--verbose"): verbose = True + elif o in ("-d", "--debug"): + debug = True elif o in ("-s", "--nossd"): test_executor.precondition = False elif o in ("-c", "--nowarm"): test_executor.warm = False elif o in ("-w", "--workload"): workloads = a.split(",") + elif o in ("-r", "--report"): + report = a + + if (debug): + logging.getLogger().setLevel(logging.DEBUG) test_executor.register_workloads(workloads) @@ -98,7 +111,10 @@ def main(argv=None): if (verbose): test_executor.register(event) - test_executor.execute() + if (report is not None): + print test_executor.fetch_results(report, workloads) + else: + test_executor.execute() if __name__ == "__main__": sys.exit(main()) diff --git a/storperf/test_executor.py b/storperf/test_executor.py index 5fb88d4..462f06b 100644 --- a/storperf/test_executor.py +++ b/storperf/test_executor.py @@ -9,15 +9,15 @@ import imp import logging -import os -import socket - from os import listdir +import os from os.path import isfile, join +import socket -from fio.fio_invoker import FIOInvoker -from carbon.emitter import CarbonMetricTransmitter from carbon.converter import JSONToCarbon +from carbon.emitter import CarbonMetricTransmitter +from db.job_db import JobDB +from fio.fio_invoker import FIOInvoker class UnknownWorkload(Exception): @@ -37,8 +37,8 @@ class TestExecutor(object): self.event_listeners = set() self.metrics_converter = JSONToCarbon() self.metrics_emitter = CarbonMetricTransmitter() - self.prefix = socket.getfqdn() - self.job_id = None + self.prefix = None + self.job_db = JobDB() def register(self, event_listener): self.event_listeners.add(event_listener) @@ -50,6 +50,7 @@ class TestExecutor(object): carbon_metrics = self.metrics_converter.convert_to_dictionary( metric, self.prefix) + read_latency = carbon_metrics[self.prefix + ".jobs.1.read.lat.mean"] write_latency = carbon_metrics[self.prefix + ".jobs.1.write.lat.mean"] read_iops = carbon_metrics[self.prefix + ".jobs.1.read.iops"] @@ -66,7 +67,7 @@ class TestExecutor(object): def register_workloads(self, workloads): - if (workloads is None or workloads.length() == 0): + if (workloads is None or len(workloads) == 0): workload_dir = os.path.normpath( os.path.join(os.path.dirname(__file__), "workloads")) @@ -112,26 +113,53 @@ class TestExecutor(object): return imp.load_source(mname, no_ext + '.py') return None - def create_job_id(self): - return 1234 - def execute(self): - if (self.job_id is None): - self.job_id = self.create_job_id() + + shortname = socket.getfqdn().split('.')[0] invoker = FIOInvoker() invoker.register(self.event) - - for numjobs in [1, 2, 4]: - - for workload_module in self.workload_modules: - constructor = getattr(workload_module, "__name__") - constructorMethod = getattr(workload_module, constructor) - self.logger.debug( - "Found constructor: " + str(constructorMethod)) - workload = constructorMethod() - workload.filename = self.filename - workload.invoker = invoker - workload.options['iodepth'] = str(numjobs) - self.logger.info("Executing workload: " + constructor) - workload.execute() + self.job_db.create_job_id() + self.logger.info("Starting job " + self.job_db.job_id) + + for workload_module in self.workload_modules: + + workload_name = getattr(workload_module, "__name__") + constructorMethod = getattr(workload_module, workload_name) + self.logger.debug( + "Found workload: " + str(constructorMethod)) + workload = constructorMethod() + workload.filename = self.filename + workload.invoker = invoker + + if (workload_name.startswith("_")): + iodepths = [2, ] + blocksizes = [4096, ] + else: + iodepths = [1, 16, 128] + blocksizes = [4096, 65536, 1048576] + + for blocksize in blocksizes: + for iodepth in iodepths: + + full_workload_name = workload_name + \ + ".queue-depth." + str(iodepth) + \ + ".block-size." + str(blocksize) + + workload.options['iodepth'] = str(iodepth) + workload.options['bs'] = str(blocksize) + self.logger.info( + "Executing workload: " + full_workload_name) + + self.prefix = shortname + "." + self.job_db.job_id + \ + "." + full_workload_name + + self.job_db.start_workload(full_workload_name) + workload.execute() + self.job_db.end_workload(full_workload_name) + + self.logger.info("Finished job " + self.job_db.job_id) + + def fetch_results(self, job, workload_name=""): + self.job_db.job_id = job + return self.job_db.fetch_results(workload_name) diff --git a/storperf/tests/db_tests/job_db_test.py b/storperf/tests/db_tests/job_db_test.py new file mode 100644 index 0000000..d9b10a2 --- /dev/null +++ b/storperf/tests/db_tests/job_db_test.py @@ -0,0 +1,174 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import unittest + +import mock + +from db.job_db import JobDB + + +class JobDBTest(unittest.TestCase): + + def setUp(self): + JobDB.db_name = ":memory:" + self.job = JobDB() + + @mock.patch("uuid.uuid4") + def test_create_job(self, mock_uuid): + expected = "ABCDE-12345" + mock_uuid.side_effect = (expected,) + + self.job.create_job_id() + + actual = self.job.job_id + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + @mock.patch("uuid.uuid4") + def test_duplicate_job_generated(self, mock_uuid): + duplicate = "EDCBA-12345" + expected = "EDCBA-54321" + + mock_uuid.side_effect = (duplicate, duplicate, expected,) + + self.job.create_job_id() + self.job.create_job_id() + + actual = self.job.job_id + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + @mock.patch("uuid.uuid4") + @mock.patch("calendar.timegm") + def test_start_job(self, mock_calendar, mock_uuid): + job_id = "ABCDE-12345" + start_time = "12345" + mock_calendar.side_effect = (start_time,) + mock_uuid.side_effect = (job_id,) + workload_name = "Workload" + + cursor = self.job.db.cursor() + + row = cursor.execute( + """select * from jobs + where job_id = ? + and workload = ?""", + (job_id, workload_name,)) + + self.assertEqual(None, + row.fetchone(), + "Should not have been a row in the db") + + self.job.start_workload(workload_name) + + cursor.execute( + """select job_id, workload, start from jobs + where job_id = ? + and workload = ?""", + (job_id, workload_name,)) + + row = cursor.fetchone() + + self.assertNotEqual(None, row, "Should be a row in the db") + self.assertEqual(job_id, row[0], "Did not expect " + str(row[0])) + self.assertEqual( + workload_name, row[1], "Did not expect " + str(row[1])) + self.assertEqual(start_time, row[2], "Did not expect " + str(row[2])) + + @mock.patch("uuid.uuid4") + @mock.patch("calendar.timegm") + def test_end_job(self, mock_calendar, mock_uuid): + job_id = "ABCDE-12345" + start_time = "12345" + end_time = "54321" + mock_calendar.side_effect = (start_time, end_time,) + mock_uuid.side_effect = (job_id,) + workload_name = "Workload" + + self.job.start_workload(workload_name) + self.job.end_workload(workload_name) + + cursor = self.job.db.cursor() + cursor.execute( + """select job_id, workload, start, end from jobs + where job_id = ? + and workload = ?""", + (job_id, workload_name,)) + + row = cursor.fetchone() + + self.assertNotEqual(None, row, "Should be a row in the db") + self.assertEqual(job_id, row[0], "Did not expect " + str(row[0])) + self.assertEqual( + workload_name, row[1], "Did not expect " + str(row[1])) + self.assertEqual(start_time, row[2], "Did not expect " + str(row[2])) + self.assertEqual(end_time, row[3], "Did not expect " + str(row[3])) + + @mock.patch("uuid.uuid4") + @mock.patch("calendar.timegm") + def test_duplicate_start_job(self, mock_calendar, mock_uuid): + job_id = "ABCDE-12345" + start_time_1 = "12345" + start_time_2 = "12346" + + mock_calendar.side_effect = (start_time_1, start_time_2) + mock_uuid.side_effect = (job_id,) + workload_name = "Workload" + + cursor = self.job.db.cursor() + + self.job.start_workload(workload_name) + self.job.start_workload(workload_name) + + cursor.execute( + """select job_id, workload, start from jobs + where job_id = ? + and workload = ?""", + (job_id, workload_name,)) + + row = cursor.fetchone() + + self.assertNotEqual(None, row, "Should be a row in the db") + self.assertEqual(job_id, row[0], "Did not expect " + str(row[0])) + self.assertEqual( + workload_name, row[1], "Did not expect " + str(row[1])) + self.assertEqual(start_time_2, row[2], "Did not expect " + str(row[2])) + + @mock.patch("uuid.uuid4") + @mock.patch("calendar.timegm") + def test_end_job_without_start(self, mock_calendar, mock_uuid): + job_id = "ABCDE-12345" + start_time = "12345" + end_time = "54321" + mock_calendar.side_effect = (start_time, end_time,) + mock_uuid.side_effect = (job_id,) + workload_name = "Workload" + + self.job.end_workload(workload_name) + + cursor = self.job.db.cursor() + cursor.execute( + """select job_id, workload, start, end from jobs + where job_id = ? + and workload = ?""", + (job_id, workload_name,)) + + row = cursor.fetchone() + + self.assertNotEqual(None, row, "Should be a row in the db") + self.assertEqual(job_id, row[0], "Did not expect " + str(row[0])) + self.assertEqual( + workload_name, row[1], "Did not expect " + str(row[1])) + # The start time is set to the same time as end if it was never set + # before + self.assertEqual(start_time, row[2], "Did not expect " + str(row[2])) + self.assertEqual(start_time, row[3], "Did not expect " + str(row[3])) diff --git a/storperf/workloads/_ssd_preconditioning.py b/storperf/workloads/_ssd_preconditioning.py index 66d5fa1..e1e8bef 100644 --- a/storperf/workloads/_ssd_preconditioning.py +++ b/storperf/workloads/_ssd_preconditioning.py @@ -13,4 +13,5 @@ class _ssd_preconditioning(_base_workload._base_workload): def setup(self): self.options['name'] = 'ssd_preconditioning' - self.options['rw'] = 'write' + self.options['rw'] = 'randwrite' + self.options['loops'] = '1' diff --git a/storperf/workloads/_warm_up.py b/storperf/workloads/_warm_up.py index 8eaa2f1..27667ca 100644 --- a/storperf/workloads/_warm_up.py +++ b/storperf/workloads/_warm_up.py @@ -12,6 +12,6 @@ from workloads import _base_workload class _warm_up(_base_workload._base_workload): def setup(self): - self.options['name'] = 'ssd_preconditioning' - self.options['rw'] = 'randwrite' - self.options['loops'] = '4' + self.options['name'] = 'warm_up' + self.options['rw'] = 'write' + self.options['loops'] = '1' -- cgit 1.2.3-korg