summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormbeierl <mark.beierl@dell.com>2017-01-20 16:05:24 -0500
committermbeierl <mark.beierl@dell.com>2017-01-20 16:05:24 -0500
commit29cab6cd9d6e669c74a1dd6960aba8250f539c2f (patch)
tree2f45b65cb23ea50aca6543df1c7cf6fa9eec0fd6
parent37773383c0ea27a934b9e97788e07276cbaaadcf (diff)
Fix multiple workload runs
Change reporting so that multiple workloads in one job execution can be reported instead of overwriting the previous value. Change the daily job to use a single, multiple workload run. Change-Id: I8e350350ae13d2272b584af7a60ad269de160587 JIRA: STORPERF-98 Signed-off-by: mbeierl <mark.beierl@dell.com>
-rwxr-xr-xci/daily.sh36
-rw-r--r--storperf/test_executor.py6
-rw-r--r--storperf/utilities/data_handler.py19
-rw-r--r--storperf/utilities/thread_gate.py60
-rw-r--r--tests/utilities_tests/data_handler_test.py11
5 files changed, 73 insertions, 59 deletions
diff --git a/ci/daily.sh b/ci/daily.sh
index e3b64cc..1e77d67 100755
--- a/ci/daily.sh
+++ b/ci/daily.sh
@@ -96,32 +96,22 @@ do
| awk '/Status/ {print $2}' | sed 's/"//g'`
done
-for WORKLOAD in ws wr rs rr rw
+export QUEUE_DEPTH=1,2,8
+export BLOCK_SIZE=2048,8192,16384
+export WORKLOAD=ws,wr,rs,rr,rw
+export SCENARIO_NAME="${CINDER_BACKEND}_${WORKLOAD}"
+
+JOB=`$WORKSPACE/ci/start_job.sh \
+ | awk '/job_id/ {print $2}' | sed 's/"//g'`
+JOB_STATUS=`curl -s -X GET "http://127.0.0.1:5000/api/v1.0/jobs?id=$JOB&type=status" \
+ | awk '/Status/ {print $2}' | sed 's/"//g'`
+while [ "$JOB_STATUS" != "Completed" ]
do
- for BLOCK_SIZE in 2048 8192 16384
- do
- for QUEUE_DEPTH in 1 2 8
- do
- export QUEUE_DEPTH
- export BLOCK_SIZE
- export WORKLOAD
- export SCENARIO_NAME="${CINDER_BACKEND}_${WORKLOAD}"
-
- JOB=`$WORKSPACE/ci/start_job.sh \
- | awk '/job_id/ {print $2}' | sed 's/"//g'`
- JOB_STATUS=`curl -s -X GET "http://127.0.0.1:5000/api/v1.0/jobs?id=$JOB&type=status" \
- | awk '/Status/ {print $2}' | sed 's/"//g'`
- while [ "$JOB_STATUS" != "Completed" ]
- do
- sleep 60
- JOB_STATUS=`curl -s -X GET "http://127.0.0.1:5000/api/v1.0/jobs?id=$JOB&type=status" \
- | awk '/Status/ {print $2}' | sed 's/"//g'`
- done
- done
- done
+ sleep 60
+ JOB_STATUS=`curl -s -X GET "http://127.0.0.1:5000/api/v1.0/jobs?id=$JOB&type=status" \
+ | awk '/Status/ {print $2}' | sed 's/"//g'`
done
-
echo "Deleting stack for cleanup"
curl -X DELETE --header 'Accept: application/json' 'http://127.0.0.1:5000/api/v1.0/configurations'
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 9c9393f..cda6c78 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -177,9 +177,9 @@ class TestExecutor(object):
def terminate(self):
self._terminated = True
self.end_time = time.time()
- return self._terminate_current_run()
+ return self.terminate_current_run()
- def _terminate_current_run(self):
+ def terminate_current_run(self):
self.logger.info("Terminating current run")
terminated_hosts = []
for workload in self._workload_executors:
@@ -243,7 +243,7 @@ class TestExecutor(object):
if self.deadline is not None \
and not workload_name.startswith("_"):
event = scheduler.enter(self.deadline * 60, 1,
- self._terminate_current_run,
+ self.terminate_current_run,
())
t = Thread(target=scheduler.run, args=())
t.start()
diff --git a/storperf/utilities/data_handler.py b/storperf/utilities/data_handler.py
index ebc1bfd..0aae3b1 100644
--- a/storperf/utilities/data_handler.py
+++ b/storperf/utilities/data_handler.py
@@ -17,6 +17,7 @@ from storperf.utilities import math as math
from storperf.utilities import steady_state as SteadyState
from time import sleep
import time
+import json
class DataHandler(object):
@@ -61,13 +62,21 @@ class DataHandler(object):
if not steady:
steady_state = False
- executor.metadata['report_data'] = metrics
- executor.metadata['steady_state'] = steady_state
+ workload = '.'.join(executor.current_workload.split('.')[1:6])
+
+ if 'report_data' not in executor.metadata:
+ executor.metadata['report_data'] = {}
+
+ if 'steady_state' not in executor.metadata:
+ executor.metadata['steady_state'] = {}
+
+ executor.metadata['report_data'][workload] = metrics
+ executor.metadata['steady_state'][workload] = steady_state
workload_name = executor.current_workload.split('.')[1]
if steady_state and not workload_name.startswith('_'):
- executor.terminate()
+ executor.terminate_current_run()
def _lookup_prior_data(self, executor, metric, io_type):
workload = executor.current_workload
@@ -112,7 +121,7 @@ class DataHandler(object):
duration = latest_timestamp - earliest_timestamp
if (duration < 60 * self.samples):
self.logger.debug("Only %s minutes of samples, ignoring" %
- (duration / 60,))
+ ((duration / 60 + 1),))
return False
return SteadyState.steady_state(data_series)
@@ -160,6 +169,6 @@ class DataHandler(object):
scenario,
criteria,
build_tag,
- payload)
+ json.dumps(payload))
except:
self.logger.exception("Error pushing results into Database")
diff --git a/storperf/utilities/thread_gate.py b/storperf/utilities/thread_gate.py
index 295b8be..38acbb1 100644
--- a/storperf/utilities/thread_gate.py
+++ b/storperf/utilities/thread_gate.py
@@ -12,6 +12,7 @@ number of callers.
"""
import logging
import time
+from threading import Lock
class FailureToReportException(Exception):
@@ -26,6 +27,7 @@ class ThreadGate(object):
self._timeout = timeout
self._registrants = {}
self._creation_time = time.time()
+ self._lock = Lock()
"""
Calling this method returns a true or false, indicating that enough
@@ -33,31 +35,33 @@ class ThreadGate(object):
"""
def report(self, gate_id):
- now = time.time()
- self._registrants[gate_id] = now
- ready = True
- self.logger.debug("Gate report for %s", gate_id)
-
- total_missing = self._gate_size - len(self._registrants)
- if total_missing > 0:
- self.logger.debug("Not all registrants have reported in")
- time_since_creation = now - self._creation_time
- if (time_since_creation > (self._timeout * 2)):
- self.logger.error("%s registrant(s) have never reported in",
- total_missing)
- raise FailureToReportException
- return False
-
- for k, v in self._registrants.items():
- time_since_last_report = now - v
- if time_since_last_report > self._timeout:
- self.logger.debug("Registrant %s last reported %s ago",
- k, time_since_last_report)
- ready = False
-
- self.logger.debug("Gate pass? %s", ready)
-
- if ready:
- self._registrants.clear()
-
- return ready
+ with self._lock:
+ now = time.time()
+ self._registrants[gate_id] = now
+ ready = True
+ self.logger.debug("Gate report for %s", gate_id)
+
+ total_missing = self._gate_size - len(self._registrants)
+ if total_missing > 0:
+ self.logger.debug("Not all registrants have reported in")
+ time_since_creation = now - self._creation_time
+ if (time_since_creation > (self._timeout * 2)):
+ self.logger.error(
+ "%s registrant(s) have never reported in",
+ total_missing)
+ raise FailureToReportException
+ return False
+
+ for k, v in self._registrants.items():
+ time_since_last_report = now - v
+ if time_since_last_report > self._timeout:
+ self.logger.debug("Registrant %s last reported %s ago",
+ k, time_since_last_report)
+ ready = False
+
+ self.logger.debug("Gate pass? %s", ready)
+
+ if ready:
+ self._registrants.clear()
+
+ return ready
diff --git a/tests/utilities_tests/data_handler_test.py b/tests/utilities_tests/data_handler_test.py
index b175c87..8115c6d 100644
--- a/tests/utilities_tests/data_handler_test.py
+++ b/tests/utilities_tests/data_handler_test.py
@@ -57,6 +57,9 @@ class DataHandlerTest(unittest.TestCase):
def terminate(self):
self._terminated = True
+ def terminate_current_run(self):
+ self._terminated = True
+
@mock.patch("time.time")
@mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
@mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
@@ -163,18 +166,22 @@ class DataHandlerTest(unittest.TestCase):
self.assertEqual(False, self._terminated)
self.assertEqual(expected_slope, self.metadata['report_data']
+ ['rw.queue-depth.8.block-size.8192']
['lat.mean']
['read']
['slope'])
self.assertEqual(expected_range, self.metadata['report_data']
+ ['rw.queue-depth.8.block-size.8192']
['lat.mean']
['read']
['range'])
self.assertEqual(expected_average, self.metadata['report_data']
+ ['rw.queue-depth.8.block-size.8192']
['lat.mean']
['read']
['average'])
self.assertEqual(series, self.metadata['report_data']
+ ['rw.queue-depth.8.block-size.8192']
['lat.mean']
['read']
['series'])
@@ -211,18 +218,22 @@ class DataHandlerTest(unittest.TestCase):
self.data_handler.data_event(self)
self.assertEqual(expected_slope, self.metadata['report_data']
+ ['rw.queue-depth.8.block-size.8192']
['lat.mean']
['read']
['slope'])
self.assertEqual(expected_range, self.metadata['report_data']
+ ['rw.queue-depth.8.block-size.8192']
['lat.mean']
['read']
['range'])
self.assertEqual(expected_average, self.metadata['report_data']
+ ['rw.queue-depth.8.block-size.8192']
['lat.mean']
['read']
['average'])
self.assertEqual(series, self.metadata['report_data']
+ ['rw.queue-depth.8.block-size.8192']
['lat.mean']
['read']
['series'])