aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/benchmark/runners/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'yardstick/benchmark/runners/base.py')
-rwxr-xr-xyardstick/benchmark/runners/base.py74
1 files changed, 10 insertions, 64 deletions
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index ebb9a91b5..f6816c7ed 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -22,46 +22,13 @@ import logging
import multiprocessing
import subprocess
import time
-import os
import traceback
-from oslo_config import cfg
-
import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
-from yardstick.dispatcher.base import Base as DispatcherBase
log = logging.getLogger(__name__)
-CONF = cfg.CONF
-
-
-def _output_serializer_main(filename, queue, config):
- """entrypoint for the singleton subprocess writing to outfile
- Use of this process enables multiple instances of a scenario without
- messing up the output file.
- """
- try:
- out_type = config['yardstick'].get('DEFAULT', {})['dispatcher']
- except KeyError:
- out_type = os.environ.get('DISPATCHER', 'file')
-
- conf = {
- 'type': out_type.capitalize(),
- 'file_path': filename
- }
-
- dispatcher = DispatcherBase.get(conf, config)
-
- while True:
- # blocks until data becomes available
- record = queue.get()
- if record == '_TERMINATE_':
- dispatcher.flush_result_data()
- break
- else:
- dispatcher.record_result_data(record)
-
def _execute_shell_command(command):
"""execute shell script with error handling"""
@@ -110,8 +77,6 @@ def _periodic_action(interval, command, queue):
class Runner(object):
- queue = None
- dump_process = None
runners = []
@staticmethod
@@ -131,30 +96,10 @@ class Runner(object):
return types
@staticmethod
- def get(runner_cfg, config):
+ def get(runner_cfg):
"""Returns instance of a scenario runner for execution type.
"""
- # if there is no runner, start the output serializer subprocess
- if not Runner.runners:
- log.debug("Starting dump process file '%s'",
- runner_cfg["output_filename"])
- Runner.queue = multiprocessing.Queue()
- Runner.dump_process = multiprocessing.Process(
- target=_output_serializer_main,
- name="Dumper",
- args=(runner_cfg["output_filename"], Runner.queue, config))
- Runner.dump_process.start()
-
- return Runner.get_cls(runner_cfg["type"])(runner_cfg, Runner.queue)
-
- @staticmethod
- def release_dump_process():
- """Release the dumper process"""
- log.debug("Stopping dump process")
- if Runner.dump_process:
- Runner.queue.put('_TERMINATE_')
- Runner.dump_process.join()
- Runner.dump_process = None
+ return Runner.get_cls(runner_cfg["type"])(runner_cfg)
@staticmethod
def release(runner):
@@ -162,10 +107,6 @@ class Runner(object):
if runner in Runner.runners:
Runner.runners.remove(runner)
- # if this was the last runner, stop the output serializer subprocess
- if not Runner.runners:
- Runner.release_dump_process()
-
@staticmethod
def terminate(runner):
"""Terminate the runner"""
@@ -179,7 +120,6 @@ class Runner(object):
# release dumper process as some errors before any runner is created
if not Runner.runners:
- Runner.release_dump_process()
return
for runner in Runner.runners:
@@ -193,11 +133,11 @@ class Runner(object):
runner.periodic_action_process = None
Runner.release(runner)
- def __init__(self, config, queue):
+ def __init__(self, config):
self.config = config
self.periodic_action_process = None
- self.result_queue = queue
self.output_queue = multiprocessing.Queue()
+ self.result_queue = multiprocessing.Queue()
self.process = None
self.aborted = multiprocessing.Event()
Runner.runners.append(self)
@@ -276,3 +216,9 @@ class Runner(object):
while not self.output_queue.empty():
result.update(self.output_queue.get())
return result
+
+ def get_result(self):
+ result = []
+ while not self.result_queue.empty():
+ result.append(self.result_queue.get())
+ return result