summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormbeierl <mark.beierl@emc.com>2015-12-11 15:31:17 -0500
committermbeierl <mark.beierl@emc.com>2015-12-11 15:31:17 -0500
commitf099c8aaa7aeae805f7534382bfef789894abffb (patch)
treef0f1faf3def935d9a6fe6f0f2a1e26ddb3815dd2
parent5108c90fa276adb761d294c963d0903395950864 (diff)
Workload reporting
Use a local db to track start/end times of runs so we can go back to the carbon db and summarize values at reporting time based off the raw data. Change-Id: Ie62afd339fd1c15d82bc56c93c7cba5bd4f90fe2 JIRA: STORPERF-29 Signed-off-by: mbeierl <mark.beierl@emc.com>
-rw-r--r--storperf/carbon/emitter.py3
-rw-r--r--storperf/db/__init__.py0
-rw-r--r--storperf/db/job_db.py197
-rw-r--r--storperf/fio/fio_invoker.py46
-rw-r--r--storperf/main.py20
-rw-r--r--storperf/test_executor.py82
-rw-r--r--storperf/tests/db_tests/job_db_test.py174
-rw-r--r--storperf/workloads/_ssd_preconditioning.py3
-rw-r--r--storperf/workloads/_warm_up.py6
9 files changed, 474 insertions, 57 deletions
diff --git a/storperf/carbon/emitter.py b/storperf/carbon/emitter.py
index e949238..8a9f734 100644
--- a/storperf/carbon/emitter.py
+++ b/storperf/carbon/emitter.py
@@ -6,9 +6,8 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-import logging
-
import calendar
+import logging
import socket
import time
diff --git a/storperf/db/__init__.py b/storperf/db/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/storperf/db/__init__.py
diff --git a/storperf/db/job_db.py b/storperf/db/job_db.py
new file mode 100644
index 0000000..a65fa78
--- /dev/null
+++ b/storperf/db/job_db.py
@@ -0,0 +1,197 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+from _sqlite3 import OperationalError
+import calendar
+import logging
+import sqlite3
+import time
+import uuid
+
+import requests
+
+
+class JobDB(object):
+
+ db_name = "StorPerf.db"
+
+ def __init__(self):
+ """
+ Creates the StorPerf.db and jobs tables on demand
+ """
+
+ self.logger = logging.getLogger(__name__)
+ self.logger.debug("Connecting to " + JobDB.db_name)
+ self.db = sqlite3.connect(JobDB.db_name)
+ self.job_id = None
+
+ cursor = self.db.cursor()
+ try:
+ cursor.execute('''CREATE TABLE jobs
+ (job_id text,
+ workload text,
+ start text,
+ end text)''')
+ self.logger.debug("Created job table")
+ except OperationalError:
+ self.logger.debug("Job table exists")
+
+ cursor.execute('SELECT * FROM jobs')
+
+ def create_job_id(self):
+ """
+ Returns a job id that is guaranteed to be unique in this
+ StorPerf instance.
+ """
+ cursor = self.db.cursor()
+
+ self.job_id = str(uuid.uuid4())
+ row = cursor.execute(
+ "select * from jobs where job_id = ?", (self.job_id,))
+
+ while (row.fetchone() is not None):
+ self.logger.info("Duplicate job id found, regenerating")
+ self.job_id = str(uuid.uuid4())
+ row = cursor.execute(
+ "select * from jobs where job_id = ?", (self.job_id,))
+
+ cursor.execute(
+ "insert into jobs(job_id) values (?)", (self.job_id,))
+ self.logger.debug("Reserved job id " + self.job_id)
+ self.db.commit()
+
+ def start_workload(self, workload_name):
+ """
+ Records the start time for the given workload
+ """
+ if (self.job_id is None):
+ self.create_job_id()
+
+ cursor = self.db.cursor()
+ now = str(calendar.timegm(time.gmtime()))
+
+ row = cursor.execute(
+ """select * from jobs
+ where job_id = ?
+ and workload = ?""",
+ (self.job_id, workload_name,))
+
+ if (row.fetchone() is None):
+ cursor.execute(
+ """insert into jobs
+ (job_id,
+ workload,
+ start)
+ values (?, ?, ?)""",
+ (self.job_id,
+ workload_name,
+ now,))
+ else:
+ self.logger.warn("Duplicate start time for workload "
+ + workload_name)
+ cursor.execute(
+ """update jobs set
+ job_id = ?,
+ start = ?
+ where workload = ?""",
+ (self.job_id,
+ now,
+ workload_name,))
+
+ self.db.commit()
+
+ def end_workload(self, workload_name):
+ """
+ Records the end time for the given workload
+ """
+ if (self.job_id is None):
+ self.create_job_id()
+
+ cursor = self.db.cursor()
+ now = str(calendar.timegm(time.gmtime()))
+
+ row = cursor.execute(
+ """select * from jobs
+ where job_id = ?
+ and workload = ?""",
+ (self.job_id, workload_name,))
+
+ if (row.fetchone() is None):
+ self.logger.warn("No start time recorded for workload "
+ + workload_name)
+ cursor.execute(
+ """insert into jobs
+ (job_id,
+ workload,
+ start,
+ end)
+ values (?, ?, ?, ?)""",
+ (self.job_id,
+ workload_name,
+ now,
+ now))
+ else:
+ cursor.execute(
+ """update jobs set
+ job_id = ?,
+ end = ?
+ where workload = ?""",
+ (self.job_id,
+ now,
+ workload_name,))
+
+ self.db.commit()
+
+ def fetch_results(self, workload_prefix=""):
+ if (workload_prefix is None):
+ workload_prefix = ""
+
+ workload_prefix = workload_prefix + "%"
+
+ stats = ()
+
+ start_time = str(calendar.timegm(time.gmtime()))
+ end_time = "0"
+
+ self.logger.debug("Workload like: " + workload_prefix)
+
+ cursor = self.db.cursor()
+ cursor.execute("""select start, end, workload
+ from jobs where workload like ?""",
+ (workload_prefix,))
+
+ while (True):
+ row = cursor.fetchone()
+ if (row is None):
+ break
+
+ start_time = str(row[0])
+ end_time = str(row[1])
+ workload = str(row[2])
+
+ # for most of these stats, we just want the final one
+ # as that is cumulative average or whatever for the whole
+ # run
+
+ self.logger.info("workload=" + workload +
+ "start=" + start_time + " end=" + end_time)
+
+ request = 'http://127.0.0.1:8000/render/?target=*.' + self.job_id + \
+ '.' + workload + '.jobs.1.*.clat.mean&format=json&from=' + \
+ start_time + "&until=" + end_time
+
+ print '\n\t' + request + '\n'
+
+ response = requests.get(request)
+
+ if (response.status_code == 200):
+ data = response.json()
+ print data
+ else:
+ pass
diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py
index be1b37c..0b13349 100644
--- a/storperf/fio/fio_invoker.py
+++ b/storperf/fio/fio_invoker.py
@@ -7,11 +7,10 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-import subprocess
import json
-from threading import Thread
-
import logging
+import subprocess
+from threading import Thread
class FIOInvoker(object):
@@ -28,30 +27,33 @@ class FIOInvoker(object):
def stdout_handler(self):
self.json_body = ""
- for line in iter(self.fio_process.stdout.readline, b''):
- if line.startswith("fio"):
- line = ""
- continue
- self.json_body += line
- try:
- if line == "}\n":
- self.logger.debug(
- "Have a json snippet: %s", self.json_body)
- json_metric = json.loads(self.json_body)
- self.json_body = ""
-
- for event_listener in self.event_listeners:
- event_listener(json_metric)
-
- except Exception, e:
- self.logger.error("Error parsing JSON: %s", e)
- pass
+ try:
+ for line in iter(self.fio_process.stdout.readline, b''):
+ if line.startswith("fio"):
+ line = ""
+ continue
+ self.json_body += line
+ try:
+ if line == "}\n":
+ self.logger.debug(
+ "Have a json snippet: %s", self.json_body)
+ json_metric = json.loads(self.json_body)
+ self.json_body = ""
+
+ for event_listener in self.event_listeners:
+ event_listener(json_metric)
+
+ except Exception, e:
+ self.logger.error("Error parsing JSON: %s", e)
+ pass
+ except ValueError:
+ pass # We might have read from the closed socket, ignore it
self.fio_process.stdout.close()
def stderr_handler(self):
for line in iter(self.fio_process.stderr.readline, b''):
- print line
+ self.logger.error("FIO Error: %s", line)
self.fio_process.stderr.close()
diff --git a/storperf/main.py b/storperf/main.py
index 2423b99..11357f4 100644
--- a/storperf/main.py
+++ b/storperf/main.py
@@ -52,18 +52,22 @@ def main(argv=None):
setup_logging()
test_executor = TestExecutor()
verbose = False
+ debug = False
workloads = None
+ report = None
if argv is None:
argv = sys.argv
try:
try:
- opts, args = getopt.getopt(argv[1:], "t:w:scvh",
+ opts, args = getopt.getopt(argv[1:], "t:w:r:scvdh",
["target=",
"workload=",
+ "report=",
"nossd",
"nowarm",
"verbose",
+ "debug",
"help",
])
except getopt.error, msg:
@@ -75,14 +79,23 @@ def main(argv=None):
return 0
elif o in ("-t", "--target"):
test_executor.filename = a
+ elif o in ("-t", "--target"):
+ report = a
elif o in ("-v", "--verbose"):
verbose = True
+ elif o in ("-d", "--debug"):
+ debug = True
elif o in ("-s", "--nossd"):
test_executor.precondition = False
elif o in ("-c", "--nowarm"):
test_executor.warm = False
elif o in ("-w", "--workload"):
workloads = a.split(",")
+ elif o in ("-r", "--report"):
+ report = a
+
+ if (debug):
+ logging.getLogger().setLevel(logging.DEBUG)
test_executor.register_workloads(workloads)
@@ -98,7 +111,10 @@ def main(argv=None):
if (verbose):
test_executor.register(event)
- test_executor.execute()
+ if (report is not None):
+ print test_executor.fetch_results(report, workloads)
+ else:
+ test_executor.execute()
if __name__ == "__main__":
sys.exit(main())
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 5fb88d4..462f06b 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -9,15 +9,15 @@
import imp
import logging
-import os
-import socket
-
from os import listdir
+import os
from os.path import isfile, join
+import socket
-from fio.fio_invoker import FIOInvoker
-from carbon.emitter import CarbonMetricTransmitter
from carbon.converter import JSONToCarbon
+from carbon.emitter import CarbonMetricTransmitter
+from db.job_db import JobDB
+from fio.fio_invoker import FIOInvoker
class UnknownWorkload(Exception):
@@ -37,8 +37,8 @@ class TestExecutor(object):
self.event_listeners = set()
self.metrics_converter = JSONToCarbon()
self.metrics_emitter = CarbonMetricTransmitter()
- self.prefix = socket.getfqdn()
- self.job_id = None
+ self.prefix = None
+ self.job_db = JobDB()
def register(self, event_listener):
self.event_listeners.add(event_listener)
@@ -50,6 +50,7 @@ class TestExecutor(object):
carbon_metrics = self.metrics_converter.convert_to_dictionary(
metric,
self.prefix)
+
read_latency = carbon_metrics[self.prefix + ".jobs.1.read.lat.mean"]
write_latency = carbon_metrics[self.prefix + ".jobs.1.write.lat.mean"]
read_iops = carbon_metrics[self.prefix + ".jobs.1.read.iops"]
@@ -66,7 +67,7 @@ class TestExecutor(object):
def register_workloads(self, workloads):
- if (workloads is None or workloads.length() == 0):
+ if (workloads is None or len(workloads) == 0):
workload_dir = os.path.normpath(
os.path.join(os.path.dirname(__file__), "workloads"))
@@ -112,26 +113,53 @@ class TestExecutor(object):
return imp.load_source(mname, no_ext + '.py')
return None
- def create_job_id(self):
- return 1234
-
def execute(self):
- if (self.job_id is None):
- self.job_id = self.create_job_id()
+
+ shortname = socket.getfqdn().split('.')[0]
invoker = FIOInvoker()
invoker.register(self.event)
-
- for numjobs in [1, 2, 4]:
-
- for workload_module in self.workload_modules:
- constructor = getattr(workload_module, "__name__")
- constructorMethod = getattr(workload_module, constructor)
- self.logger.debug(
- "Found constructor: " + str(constructorMethod))
- workload = constructorMethod()
- workload.filename = self.filename
- workload.invoker = invoker
- workload.options['iodepth'] = str(numjobs)
- self.logger.info("Executing workload: " + constructor)
- workload.execute()
+ self.job_db.create_job_id()
+ self.logger.info("Starting job " + self.job_db.job_id)
+
+ for workload_module in self.workload_modules:
+
+ workload_name = getattr(workload_module, "__name__")
+ constructorMethod = getattr(workload_module, workload_name)
+ self.logger.debug(
+ "Found workload: " + str(constructorMethod))
+ workload = constructorMethod()
+ workload.filename = self.filename
+ workload.invoker = invoker
+
+ if (workload_name.startswith("_")):
+ iodepths = [2, ]
+ blocksizes = [4096, ]
+ else:
+ iodepths = [1, 16, 128]
+ blocksizes = [4096, 65536, 1048576]
+
+ for blocksize in blocksizes:
+ for iodepth in iodepths:
+
+ full_workload_name = workload_name + \
+ ".queue-depth." + str(iodepth) + \
+ ".block-size." + str(blocksize)
+
+ workload.options['iodepth'] = str(iodepth)
+ workload.options['bs'] = str(blocksize)
+ self.logger.info(
+ "Executing workload: " + full_workload_name)
+
+ self.prefix = shortname + "." + self.job_db.job_id + \
+ "." + full_workload_name
+
+ self.job_db.start_workload(full_workload_name)
+ workload.execute()
+ self.job_db.end_workload(full_workload_name)
+
+ self.logger.info("Finished job " + self.job_db.job_id)
+
+ def fetch_results(self, job, workload_name=""):
+ self.job_db.job_id = job
+ return self.job_db.fetch_results(workload_name)
diff --git a/storperf/tests/db_tests/job_db_test.py b/storperf/tests/db_tests/job_db_test.py
new file mode 100644
index 0000000..d9b10a2
--- /dev/null
+++ b/storperf/tests/db_tests/job_db_test.py
@@ -0,0 +1,174 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import unittest
+
+import mock
+
+from db.job_db import JobDB
+
+
+class JobDBTest(unittest.TestCase):
+
+ def setUp(self):
+ JobDB.db_name = ":memory:"
+ self.job = JobDB()
+
+ @mock.patch("uuid.uuid4")
+ def test_create_job(self, mock_uuid):
+ expected = "ABCDE-12345"
+ mock_uuid.side_effect = (expected,)
+
+ self.job.create_job_id()
+
+ actual = self.job.job_id
+
+ self.assertEqual(
+ expected, actual, "Did not expect: " + str(actual))
+
+ @mock.patch("uuid.uuid4")
+ def test_duplicate_job_generated(self, mock_uuid):
+ duplicate = "EDCBA-12345"
+ expected = "EDCBA-54321"
+
+ mock_uuid.side_effect = (duplicate, duplicate, expected,)
+
+ self.job.create_job_id()
+ self.job.create_job_id()
+
+ actual = self.job.job_id
+
+ self.assertEqual(
+ expected, actual, "Did not expect: " + str(actual))
+
+ @mock.patch("uuid.uuid4")
+ @mock.patch("calendar.timegm")
+ def test_start_job(self, mock_calendar, mock_uuid):
+ job_id = "ABCDE-12345"
+ start_time = "12345"
+ mock_calendar.side_effect = (start_time,)
+ mock_uuid.side_effect = (job_id,)
+ workload_name = "Workload"
+
+ cursor = self.job.db.cursor()
+
+ row = cursor.execute(
+ """select * from jobs
+ where job_id = ?
+ and workload = ?""",
+ (job_id, workload_name,))
+
+ self.assertEqual(None,
+ row.fetchone(),
+ "Should not have been a row in the db")
+
+ self.job.start_workload(workload_name)
+
+ cursor.execute(
+ """select job_id, workload, start from jobs
+ where job_id = ?
+ and workload = ?""",
+ (job_id, workload_name,))
+
+ row = cursor.fetchone()
+
+ self.assertNotEqual(None, row, "Should be a row in the db")
+ self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
+ self.assertEqual(
+ workload_name, row[1], "Did not expect " + str(row[1]))
+ self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
+
+ @mock.patch("uuid.uuid4")
+ @mock.patch("calendar.timegm")
+ def test_end_job(self, mock_calendar, mock_uuid):
+ job_id = "ABCDE-12345"
+ start_time = "12345"
+ end_time = "54321"
+ mock_calendar.side_effect = (start_time, end_time,)
+ mock_uuid.side_effect = (job_id,)
+ workload_name = "Workload"
+
+ self.job.start_workload(workload_name)
+ self.job.end_workload(workload_name)
+
+ cursor = self.job.db.cursor()
+ cursor.execute(
+ """select job_id, workload, start, end from jobs
+ where job_id = ?
+ and workload = ?""",
+ (job_id, workload_name,))
+
+ row = cursor.fetchone()
+
+ self.assertNotEqual(None, row, "Should be a row in the db")
+ self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
+ self.assertEqual(
+ workload_name, row[1], "Did not expect " + str(row[1]))
+ self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
+ self.assertEqual(end_time, row[3], "Did not expect " + str(row[3]))
+
+ @mock.patch("uuid.uuid4")
+ @mock.patch("calendar.timegm")
+ def test_duplicate_start_job(self, mock_calendar, mock_uuid):
+ job_id = "ABCDE-12345"
+ start_time_1 = "12345"
+ start_time_2 = "12346"
+
+ mock_calendar.side_effect = (start_time_1, start_time_2)
+ mock_uuid.side_effect = (job_id,)
+ workload_name = "Workload"
+
+ cursor = self.job.db.cursor()
+
+ self.job.start_workload(workload_name)
+ self.job.start_workload(workload_name)
+
+ cursor.execute(
+ """select job_id, workload, start from jobs
+ where job_id = ?
+ and workload = ?""",
+ (job_id, workload_name,))
+
+ row = cursor.fetchone()
+
+ self.assertNotEqual(None, row, "Should be a row in the db")
+ self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
+ self.assertEqual(
+ workload_name, row[1], "Did not expect " + str(row[1]))
+ self.assertEqual(start_time_2, row[2], "Did not expect " + str(row[2]))
+
+ @mock.patch("uuid.uuid4")
+ @mock.patch("calendar.timegm")
+ def test_end_job_without_start(self, mock_calendar, mock_uuid):
+ job_id = "ABCDE-12345"
+ start_time = "12345"
+ end_time = "54321"
+ mock_calendar.side_effect = (start_time, end_time,)
+ mock_uuid.side_effect = (job_id,)
+ workload_name = "Workload"
+
+ self.job.end_workload(workload_name)
+
+ cursor = self.job.db.cursor()
+ cursor.execute(
+ """select job_id, workload, start, end from jobs
+ where job_id = ?
+ and workload = ?""",
+ (job_id, workload_name,))
+
+ row = cursor.fetchone()
+
+ self.assertNotEqual(None, row, "Should be a row in the db")
+ self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
+ self.assertEqual(
+ workload_name, row[1], "Did not expect " + str(row[1]))
+ # The start time is set to the same time as end if it was never set
+ # before
+ self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
+ self.assertEqual(start_time, row[3], "Did not expect " + str(row[3]))
diff --git a/storperf/workloads/_ssd_preconditioning.py b/storperf/workloads/_ssd_preconditioning.py
index 66d5fa1..e1e8bef 100644
--- a/storperf/workloads/_ssd_preconditioning.py
+++ b/storperf/workloads/_ssd_preconditioning.py
@@ -13,4 +13,5 @@ class _ssd_preconditioning(_base_workload._base_workload):
def setup(self):
self.options['name'] = 'ssd_preconditioning'
- self.options['rw'] = 'write'
+ self.options['rw'] = 'randwrite'
+ self.options['loops'] = '1'
diff --git a/storperf/workloads/_warm_up.py b/storperf/workloads/_warm_up.py
index 8eaa2f1..27667ca 100644
--- a/storperf/workloads/_warm_up.py
+++ b/storperf/workloads/_warm_up.py
@@ -12,6 +12,6 @@ from workloads import _base_workload
class _warm_up(_base_workload._base_workload):
def setup(self):
- self.options['name'] = 'ssd_preconditioning'
- self.options['rw'] = 'randwrite'
- self.options['loops'] = '4'
+ self.options['name'] = 'warm_up'
+ self.options['rw'] = 'write'
+ self.options['loops'] = '1'