summaryrefslogtreecommitdiffstats
path: root/storperf/test_executor.py
diff options
context:
space:
mode:
Diffstat (limited to 'storperf/test_executor.py')
-rw-r--r--storperf/test_executor.py68
1 files changed, 65 insertions, 3 deletions
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 8230174..592c33d 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -1,5 +1,5 @@
##############################################################################
-# Copyright (c) 2015 EMC and others.
+# Copyright (c) 2016 EMC and others.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
@@ -11,8 +11,11 @@ from os import listdir
from os.path import isfile, join
from storperf.carbon.converter import Converter
from storperf.carbon.emitter import CarbonMetricTransmitter
+from storperf.db import test_results_db
+from storperf.db.graphite_db import GraphiteDB
from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
+from storperf.utilities import dictionary
from threading import Thread
import copy
import imp
@@ -35,6 +38,7 @@ class TestExecutor(object):
self.precondition = True
self.deadline = None
self.warm = True
+ self.metadata = None
self._queue_depths = [1, 4, 8]
self._block_sizes = [512, 4096, 16384]
self.event_listeners = set()
@@ -138,9 +142,12 @@ class TestExecutor(object):
return imp.load_source(mname, no_ext + '.py')
return None
- def execute(self):
+ def execute(self, metadata):
self.job_db.create_job_id()
- self._workload_thread = Thread(target=self.execute_workloads, args=())
+ self.job_db.record_workload_params(metadata)
+ self.metadata = metadata
+ self._workload_thread = Thread(target=self.execute_workloads,
+ args=())
self._workload_thread.start()
return self.job_db.job_id
@@ -158,8 +165,14 @@ class TestExecutor(object):
def execute_workloads(self):
self._terminated = False
+ self.logger.info("Starting job %s" % (self.job_db.job_id))
+
+ start_time = time.time()
+
for workload_module in self.workload_modules:
workload_name = getattr(workload_module, "__name__")
+ self.logger.info("Starting workload %s" % (workload_name))
+
constructorMethod = getattr(workload_module, workload_name)
workload = constructorMethod()
if (self.filename is not None):
@@ -215,6 +228,55 @@ class TestExecutor(object):
self._workload_executors = []
+ self.logger.info("Completed workload %s" % (workload_name))
+ self.logger.info("Completed job %s" % (self.job_db.job_id))
+ end_time = time.time()
+ pod_name = dictionary.get_key_from_dict(self.metadata,
+ 'pod_name',
+ 'Unknown')
+ version = dictionary.get_key_from_dict(self.metadata,
+ 'version',
+ 'Unknown')
+ scenario = dictionary.get_key_from_dict(self.metadata,
+ 'scenario',
+ 'Unknown')
+ build_tag = dictionary.get_key_from_dict(self.metadata,
+ 'build_tag',
+ 'Unknown')
+ duration = end_time - start_time
+ test_db = os.environ.get('TEST_DB_URL')
+
+ if test_db is not None:
+ # I really do not like doing this. As our threads just
+ # terminated, their final results are still being spooled
+ # off to Carbon. Need to give that a little time to finish
+ time.sleep(5)
+ self.logger.info("Pushing results to %s" % (test_db))
+
+ payload = self.metadata
+ payload['timestart'] = start_time
+ payload['duration'] = duration
+ payload['status'] = 'OK'
+ graphite_db = GraphiteDB()
+ payload['metrics'] = graphite_db.fetch_averages(self.job_db.job_id)
+ criteria = {}
+ criteria['block_sizes'] = self.block_sizes
+ criteria['queue_depths'] = self.block_sizes
+
+ try:
+ test_results_db.push_results_to_db(test_db,
+ "storperf",
+ "Latency Test",
+ self.logger,
+ pod_name,
+ version,
+ scenario,
+ criteria,
+ build_tag,
+ payload)
+ except:
+ self.logger.exception("Error pushing results into Database")
+
def execute_on_node(self, workload):
invoker = FIOInvoker()