aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/benchmark/runners/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'yardstick/benchmark/runners/base.py')
-rwxr-xr-xyardstick/benchmark/runners/base.py71
1 files changed, 57 insertions, 14 deletions
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index a887fa5b3..94de45d1e 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -12,24 +12,22 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+#
+# This is a modified copy of ``rally/rally/benchmark/runners/base.py``
-# yardstick comment: this is a modified copy of
-# rally/rally/benchmark/runners/base.py
-
-from __future__ import absolute_import
-
+import importlib
import logging
import multiprocessing
import subprocess
import time
import traceback
-import importlib
-
-from six.moves.queue import Empty
+from six import moves
-import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
+from yardstick.common import utils
+from yardstick.dispatcher.base import Base as DispatcherBase
+
log = logging.getLogger(__name__)
@@ -39,7 +37,7 @@ def _execute_shell_command(command):
exitcode = 0
try:
output = subprocess.check_output(command, shell=True)
- except Exception:
+ except subprocess.CalledProcessError:
exitcode = -1
output = traceback.format_exc()
log.error("exec command '%s' error:\n ", command)
@@ -79,6 +77,33 @@ def _periodic_action(interval, command, queue):
queue.put({'periodic-action-data': data})
+class ScenarioOutput(dict):
+
+ QUEUE_PUT_TIMEOUT = 10
+
+ def __init__(self, queue, **kwargs):
+ super(ScenarioOutput, self).__init__()
+ self._queue = queue
+ self.result_ext = dict()
+ for key, val in kwargs.items():
+ self.result_ext[key] = val
+ setattr(self, key, val)
+
+ def push(self, data=None, add_timestamp=True):
+ if data is None:
+ data = dict(self)
+
+ if add_timestamp:
+ result = {'timestamp': time.time(), 'data': data}
+ else:
+ result = data
+
+ for key in self.result_ext.keys():
+ result[key] = getattr(self, key)
+
+ self._queue.put(result, True, self.QUEUE_PUT_TIMEOUT)
+
+
class Runner(object):
runners = []
@@ -119,7 +144,7 @@ class Runner(object):
@staticmethod
def terminate_all():
"""Terminate all runners (subprocesses)"""
- log.debug("Terminating all runners", exc_info=True)
+ log.debug("Terminating all runners")
# release dumper process as some errors before any runner is created
if not Runner.runners:
@@ -137,6 +162,8 @@ class Runner(object):
Runner.release(runner)
def __init__(self, config):
+ self.task_id = None
+ self.case_name = None
self.config = config
self.periodic_action_process = None
self.output_queue = multiprocessing.Queue()
@@ -170,6 +197,8 @@ class Runner(object):
cls = getattr(module, path_split[-1])
self.config['object'] = class_name
+ self.case_name = scenario_cfg['tc']
+ self.task_id = scenario_cfg['task_id']
self.aborted.clear()
# run a potentially configured pre-start action
@@ -239,16 +268,30 @@ class Runner(object):
log.debug("output_queue size %s", self.output_queue.qsize())
try:
result.update(self.output_queue.get(True, 1))
- except Empty:
+ except moves.queue.Empty:
pass
return result
def get_result(self):
result = []
+
+ dispatcher = self.config['output_config']['DEFAULT']['dispatcher']
+ output_in_influxdb = 'influxdb' in dispatcher
+
while not self.result_queue.empty():
log.debug("result_queue size %s", self.result_queue.qsize())
try:
- result.append(self.result_queue.get(True, 1))
- except Empty:
+ one_record = self.result_queue.get(True, 1)
+ except moves.queue.Empty:
pass
+ else:
+ if output_in_influxdb:
+ self._output_to_influxdb(one_record)
+
+ result.append(one_record)
return result
+
+ def _output_to_influxdb(self, record):
+ dispatchers = DispatcherBase.get(self.config['output_config'])
+ dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb'))
+ dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id)