diff options
Diffstat (limited to 'yardstick/benchmark/runners/base.py')
-rwxr-xr-x | yardstick/benchmark/runners/base.py | 71 |
1 files changed, 57 insertions, 14 deletions
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index a887fa5b3..94de45d1e 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -12,24 +12,22 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +# +# This is a modified copy of ``rally/rally/benchmark/runners/base.py`` -# yardstick comment: this is a modified copy of -# rally/rally/benchmark/runners/base.py - -from __future__ import absolute_import - +import importlib import logging import multiprocessing import subprocess import time import traceback -import importlib - -from six.moves.queue import Empty +from six import moves -import yardstick.common.utils as utils from yardstick.benchmark.scenarios import base as base_scenario +from yardstick.common import utils +from yardstick.dispatcher.base import Base as DispatcherBase + log = logging.getLogger(__name__) @@ -39,7 +37,7 @@ def _execute_shell_command(command): exitcode = 0 try: output = subprocess.check_output(command, shell=True) - except Exception: + except subprocess.CalledProcessError: exitcode = -1 output = traceback.format_exc() log.error("exec command '%s' error:\n ", command) @@ -79,6 +77,33 @@ def _periodic_action(interval, command, queue): queue.put({'periodic-action-data': data}) +class ScenarioOutput(dict): + + QUEUE_PUT_TIMEOUT = 10 + + def __init__(self, queue, **kwargs): + super(ScenarioOutput, self).__init__() + self._queue = queue + self.result_ext = dict() + for key, val in kwargs.items(): + self.result_ext[key] = val + setattr(self, key, val) + + def push(self, data=None, add_timestamp=True): + if data is None: + data = dict(self) + + if add_timestamp: + result = {'timestamp': time.time(), 'data': data} + else: + result = data + + for key in self.result_ext.keys(): + result[key] = getattr(self, key) + + self._queue.put(result, True, self.QUEUE_PUT_TIMEOUT) + + class Runner(object): runners = [] @@ -119,7 +144,7 @@ class Runner(object): @staticmethod def terminate_all(): """Terminate all runners (subprocesses)""" - log.debug("Terminating all runners", exc_info=True) + log.debug("Terminating all runners") # release dumper process as some errors before any runner is created if not Runner.runners: @@ -137,6 +162,8 @@ class Runner(object): Runner.release(runner) def __init__(self, config): + self.task_id = None + self.case_name = None self.config = config self.periodic_action_process = None self.output_queue = multiprocessing.Queue() @@ -170,6 +197,8 @@ class Runner(object): cls = getattr(module, path_split[-1]) self.config['object'] = class_name + self.case_name = scenario_cfg['tc'] + self.task_id = scenario_cfg['task_id'] self.aborted.clear() # run a potentially configured pre-start action @@ -239,16 +268,30 @@ class Runner(object): log.debug("output_queue size %s", self.output_queue.qsize()) try: result.update(self.output_queue.get(True, 1)) - except Empty: + except moves.queue.Empty: pass return result def get_result(self): result = [] + + dispatcher = self.config['output_config']['DEFAULT']['dispatcher'] + output_in_influxdb = 'influxdb' in dispatcher + while not self.result_queue.empty(): log.debug("result_queue size %s", self.result_queue.qsize()) try: - result.append(self.result_queue.get(True, 1)) - except Empty: + one_record = self.result_queue.get(True, 1) + except moves.queue.Empty: pass + else: + if output_in_influxdb: + self._output_to_influxdb(one_record) + + result.append(one_record) return result + + def _output_to_influxdb(self, record): + dispatchers = DispatcherBase.get(self.config['output_config']) + dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb')) + dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id) |