diff options
-rwxr-xr-x | ci/setup.py | 1 | ||||
-rw-r--r-- | rest_server.py | 89 | ||||
-rw-r--r-- | storperf/db/graphite_db.py | 5 | ||||
-rw-r--r-- | storperf/db/job_db.py | 54 | ||||
-rw-r--r-- | storperf/plot/__init__.py | 0 | ||||
-rw-r--r-- | storperf/plot/barchart.py | 92 | ||||
-rw-r--r-- | storperf/storperf_master.py | 26 | ||||
-rw-r--r-- | storperf/test_executor.py | 28 | ||||
-rw-r--r-- | storperf/tests/db_tests/job_db_test.py | 6 | ||||
-rw-r--r-- | storperf/tests/storperf_master_test.py | 18 | ||||
-rw-r--r-- | storperf/workloads/_base_workload.py | 2 |
11 files changed, 312 insertions, 9 deletions
diff --git a/ci/setup.py b/ci/setup.py index 2a02276..a293e9c 100755 --- a/ci/setup.py +++ b/ci/setup.py @@ -25,6 +25,7 @@ setup( "flask-restful==0.3.5", "flask-restful-swagger==0.19", "html2text==2016.1.8", + "matplotlib==1.3.1", "python-cinderclient==1.6.0", "python-glanceclient==1.1.0", "python-heatclient==0.8.0", diff --git a/rest_server.py b/rest_server.py index f0a817b..72f849a 100644 --- a/rest_server.py +++ b/rest_server.py @@ -7,7 +7,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from storperf.storperf_master import StorPerfMaster +import io import json import logging import logging.config @@ -15,7 +15,12 @@ import os from flask import abort, Flask, request, jsonify, send_from_directory from flask_restful import Resource, Api, fields + from flask_restful_swagger import swagger +from storperf.db.job_db import JobDB +from storperf.plot.barchart import Barchart +from storperf.storperf_master import StorPerfMaster + app = Flask(__name__, static_url_path="") api = swagger.docs(Api(app), apiVersion='1.0') @@ -28,6 +33,82 @@ def send_swagger(path): return send_from_directory('storperf/resources/html/swagger', path) +@app.route('/results/<path:job_id>') +def results_page(job_id): + + job_db = JobDB() + params = {} + + params = job_db.fetch_workload_params(job_id) + + results = storperf.fetch_results(job_id) + workloads = [] + block_sizes = [] + queue_depths = [] + + for key, value in results.iteritems(): + workload = key.split('.')[0] + queue_depth = int(key.split('.')[2]) + block_size = int(key.split('.')[4]) + if workload not in workloads: + workloads.append(workload) + if queue_depth not in queue_depths: + queue_depths.append(queue_depth) + if block_size not in block_sizes: + block_sizes.append(block_size) + + queue_depths.sort() + block_sizes.sort() + + read_latencies = [] + write_latencies = [] +# for workload in workloads: + workload = "rw" + + for queue_depth in queue_depths: + rlatencies = [] + read_latencies.append(rlatencies) + wlatencies = [] + write_latencies.append(wlatencies) + for block_size in block_sizes: + + key = "%s.queue-depth.%s.block-size.%s.read.latency" % \ + (workload, queue_depth, block_size) + + print key + "=" + str(results[key]) + if key in results: + rlatencies.append(results[key] / 1000) + key = "%s.queue-depth.%s.block-size.%s.write.latency" % \ + (workload, queue_depth, block_size) + if key in results: + wlatencies.append(results[key] / 1000) + + chart = Barchart() + chart.barchart3d(queue_depths, block_sizes, read_latencies, 'g', + 'Read Latency (ms)') + readchart = chart.to_base64_image() + + chart.barchart3d(queue_depths, block_sizes, write_latencies, 'r', + 'Write Latency (ms)') + writechart = chart.to_base64_image() + + html = """<html><body>%s <BR> + Number of VMs: %s <BR> + Cinder volume size per VM: %s (GB) <BR> + <center>Read Latency Report <BR> + <img src="data:image/png;base64,%s"/> + <center>Write Latency Report <BR> + <img src="data:image/png;base64,%s"/> + </body></html>""" % (job_id, + params['agent_count'], + params['volume_size'], + readchart, + writechart, + ) + + return html + + @swagger.model class ConfigurationRequestModel: resource_fields = { @@ -123,6 +204,8 @@ class WorkloadModel: 'nossd': fields.String, 'nowarm': fields.String, 'workload': fields.String, + 'queue_depths': fields.String, + 'block_sizes': fields.String } @@ -210,6 +293,10 @@ class Job(Resource): storperf.precondition = False if ('nowarm' in request.json): storperf.warm_up = False + if ('queue_depths' in request.json): + storperf.queue_depths = request.json['queue_depths'] + if ('block_sizes' in request.json): + storperf.block_sizes = request.json['block_sizes'] if ('workload' in request.json): storperf.workloads = request.json['workload'] else: diff --git a/storperf/db/graphite_db.py b/storperf/db/graphite_db.py index 8fef071..c44d2aa 100644 --- a/storperf/db/graphite_db.py +++ b/storperf/db/graphite_db.py @@ -108,7 +108,10 @@ class GraphiteDB(object): total += datapoint[0] count += 1 - average = total / count + if count > 0: + average = total / count + else: + average = total return average diff --git a/storperf/db/job_db.py b/storperf/db/job_db.py index d42568a..412c6bc 100644 --- a/storperf/db/job_db.py +++ b/storperf/db/job_db.py @@ -45,7 +45,17 @@ class JobDB(object): except OperationalError: self.logger.debug("Job table exists") + try: + cursor.execute('''CREATE TABLE job_params + (job_id text, + param text, + value text)''') + self.logger.debug("Created job_params table") + except OperationalError: + self.logger.debug("Job params table exists") + cursor.execute('SELECT * FROM jobs') + cursor.execute('SELECT * FROM job_params') db.commit() db.close() @@ -177,7 +187,7 @@ class JobDB(object): with db_mutex: db = sqlite3.connect(JobDB.db_name) cursor = db.cursor() - cursor.execute("""select workload, start, end + cursor.execute("""select workload, start, end from jobs where workload like ?""", (workload_prefix,)) @@ -190,3 +200,45 @@ class JobDB(object): db.close() return workload_executions + + def record_workload_params(self, job_id, params): + """ + """ + with db_mutex: + + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + for param, value in params.iteritems(): + cursor.execute( + """insert into job_params + (job_id, + param, + value) + values (?, ?, ?)""", + (job_id, + param, + value,)) + db.commit() + db.close() + + def fetch_workload_params(self, job_id): + """ + """ + params = {} + with db_mutex: + + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + + cursor.execute( + "select param, value from job_params where job_id = ?", + (job_id,)) + + while (True): + row = cursor.fetchone() + if (row is None): + break + params[row[0]] = row[1] + + db.close() + return params diff --git a/storperf/plot/__init__.py b/storperf/plot/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/storperf/plot/__init__.py diff --git a/storperf/plot/barchart.py b/storperf/plot/barchart.py new file mode 100644 index 0000000..871defa --- /dev/null +++ b/storperf/plot/barchart.py @@ -0,0 +1,92 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import cStringIO + +import matplotlib as mpl +mpl.use('Agg') +import matplotlib.pyplot as pyplot +from mpl_toolkits.mplot3d import Axes3D +import numpy as np + + +class Barchart(object): + + def __init__(self): + pass + + def barchart3d(self, queue_depths, block_sizes, latencies, c, title): + + fig = pyplot.figure() + + #ax = Axes3D(fig) + + data = np.array(latencies) + + lx = len(data[0]) # Work out matrix dimensions + ly = len(data[:, 0]) + xpos = np.arange(0, lx, 1) # Set up a mesh of positions + ypos = np.arange(0, ly, 1) + xpos, ypos = np.meshgrid(xpos + 0.25, ypos + 0.25) + + xpos = xpos.flatten() # Convert positions to 1D array + ypos = ypos.flatten() + zpos = np.zeros(lx * ly) + + dx = 0.5 * np.ones_like(zpos) + dy = dx.copy() + dz = data.flatten() + + ax = fig.add_subplot(111, projection='3d') + ax.bar3d(xpos, ypos, zpos, dx, dy, dz, color=c) + + ticksx = np.arange(0.5, len(block_sizes), 1) + pyplot.xticks(ticksx, block_sizes) + + ticksy = np.arange(0.6, len(queue_depths), 1) + pyplot.yticks(ticksy, queue_depths) + + ax.set_xlabel('Block Size') + ax.set_ylabel('Queue Depth') + ax.set_zlabel(title) + + ticksx = np.arange(0.5, 3, 1) + pyplot.xticks(ticksx, block_sizes) + + def barchart(self, queue_depths, block_sizes, read_latencies): + pyplot.figure() + + y_pos = np.arange(len(block_sizes)) + bar_width = 0.15 + + colors = ['r', 'g', 'b', 'y'] + legend = [] + index = 0 + for series in queue_depths: + chart = pyplot.bar(y_pos + (bar_width * index), + read_latencies[index], + bar_width, + color=colors[index], + align='center', + label="Queue Depth " + str(series), + alpha=0.4) + legend.append(chart[0]) + index += 1 + + pyplot.xticks(y_pos + bar_width, block_sizes) + pyplot.ylabel("Latency (Microseconds)") + pyplot.xlabel("Block Sizes (bytes)") + pyplot.title("Latency Report") + pyplot.legend() + pyplot.tight_layout() + + def to_base64_image(self): + sio = cStringIO.StringIO() + pyplot.savefig(sio, format="png") + return sio.getvalue().encode("base64").strip() diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py index 2a03753..c7739da 100644 --- a/storperf/storperf_master.py +++ b/storperf/storperf_master.py @@ -15,6 +15,7 @@ import os import subprocess from db.configuration_db import ConfigurationDB +from db.job_db import JobDB from test_executor import TestExecutor import cinderclient.v2 as cinderclient import heatclient.client as heatclient @@ -31,6 +32,7 @@ class StorPerfMaster(object): self.logger = logging.getLogger(__name__) self.configuration_db = ConfigurationDB() + self.job_db = JobDB() template_file = open("storperf/resources/hot/agent-group.yaml") self._agent_group_hot = template_file.read() @@ -159,6 +161,22 @@ class StorPerfMaster(object): self._test_executor.warm = value @property + def queue_depths(self): + return self._test_executor.queue_depths + + @queue_depths.setter + def queue_depths(self, value): + self._test_executor.queue_depths = value + + @property + def block_sizes(self): + return self._test_executor.block_sizes + + @block_sizes.setter + def block_sizes(self, value): + self._test_executor.block_sizes = value + + @property def is_stack_created(self): if (self.stack_id is not None): self._attach_to_openstack() @@ -262,7 +280,13 @@ class StorPerfMaster(object): thread.join() self._test_executor.slaves = slaves - return self._test_executor.execute() + job_id = self._test_executor.execute() + params = {} + params['agent_count'] = self.agent_count + params['public_network'] = self.public_network + params['volume_size'] = self.volume_size + self.job_db.record_workload_params(job_id, params) + return job_id def terminate_workloads(self): return self._test_executor.terminate() diff --git a/storperf/test_executor.py b/storperf/test_executor.py index 309fbcb..6b9c441 100644 --- a/storperf/test_executor.py +++ b/storperf/test_executor.py @@ -32,6 +32,8 @@ class TestExecutor(object): self.filename = None self.precondition = True self.warm = True + self._queue_depths = [1, 4, 8] + self._block_sizes = [512, 4096, 16384] self.event_listeners = set() self.metrics_converter = Converter() self.metrics_emitter = CarbonMetricTransmitter() @@ -51,6 +53,24 @@ class TestExecutor(object): self.logger.debug("Set slaves to: " + str(slaves)) self._slaves = slaves + @property + def queue_depths(self): + return ','.join(self._queue_depths) + + @queue_depths.setter + def queue_depths(self, queue_depths): + self.logger.debug("Set queue_depths to: " + str(queue_depths)) + self._queue_depths = queue_depths.split(',') + + @property + def block_sizes(self): + return ','.join(self._block_sizes) + + @block_sizes.setter + def block_sizes(self, block_sizes): + self.logger.debug("Set block_sizes to: " + str(block_sizes)) + self._block_sizes = block_sizes.split(',') + def register(self, event_listener): self.event_listeners.add(event_listener) @@ -138,11 +158,11 @@ class TestExecutor(object): workload.filename = self.filename if (workload_name.startswith("_")): - iodepths = [32, ] - blocksizes = [8192, ] + iodepths = [8, ] + blocksizes = [16384, ] else: - iodepths = [128, 16, 1] - blocksizes = [8192, 4096, 512] + iodepths = self._queue_depths + blocksizes = self._block_sizes workload.id = self.job_db.job_id diff --git a/storperf/tests/db_tests/job_db_test.py b/storperf/tests/db_tests/job_db_test.py index 92b1482..0972f84 100644 --- a/storperf/tests/db_tests/job_db_test.py +++ b/storperf/tests/db_tests/job_db_test.py @@ -183,3 +183,9 @@ class JobDBTest(unittest.TestCase): # before self.assertEqual(start_time, row[2], "Did not expect " + str(row[2])) self.assertEqual(start_time, row[3], "Did not expect " + str(row[3])) + + def test_job_params(self): + expected = {"a": "1", "b": "2"} + self.job.record_workload_params("ABCD", expected) + actual = self.job.fetch_workload_params("ABCD") + self.assertEqual(expected, actual) diff --git a/storperf/tests/storperf_master_test.py b/storperf/tests/storperf_master_test.py index 33c1699..2dc810d 100644 --- a/storperf/tests/storperf_master_test.py +++ b/storperf/tests/storperf_master_test.py @@ -32,6 +32,24 @@ class StorPerfMasterTest(unittest.TestCase): self.assertEqual( expected, actual, "Did not expect: " + str(actual)) + def test_queue_depths(self): + expected = "1,2,3" + + self.storperf.queue_depths = expected + actual = self.storperf.queue_depths + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + def test_block_sizes(self): + expected = "8,2,1,0" + + self.storperf.block_sizes = expected + actual = self.storperf.block_sizes + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + def test_volume_size(self): expected = 20 diff --git a/storperf/workloads/_base_workload.py b/storperf/workloads/_base_workload.py index dc448fd..874e99c 100644 --- a/storperf/workloads/_base_workload.py +++ b/storperf/workloads/_base_workload.py @@ -23,7 +23,7 @@ class _base_workload(object): 'bs': '64k', 'iodepth': '1', 'numjobs': '1', - 'loops': '2', + 'loops': '1', 'output-format': 'json', 'status-interval': '60' } |