aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/benchmark/runners/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'yardstick/benchmark/runners/base.py')
-rwxr-xr-xyardstick/benchmark/runners/base.py44
1 files changed, 31 insertions, 13 deletions
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index fbdf6c281..af2557441 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -12,27 +12,26 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+#
+# This is a modified copy of ``rally/rally/benchmark/runners/base.py``
-# yardstick comment: this is a modified copy of
-# rally/rally/benchmark/runners/base.py
-
-from __future__ import absolute_import
-
+import importlib
import logging
import multiprocessing
import subprocess
import time
import traceback
-from subprocess import CalledProcessError
-
-import importlib
-from six.moves.queue import Empty
+from six import moves
-import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
+from yardstick.common.messaging import producer
+from yardstick.common import utils
from yardstick.dispatcher.base import Base as DispatcherBase
+
log = logging.getLogger(__name__)
@@ -41,7 +40,7 @@ def _execute_shell_command(command):
exitcode = 0
try:
output = subprocess.check_output(command, shell=True)
- except CalledProcessError:
+ except subprocess.CalledProcessError:
exitcode = -1
output = traceback.format_exc()
log.error("exec command '%s' error:\n ", command)
@@ -245,7 +244,7 @@ class Runner(object):
log.debug("output_queue size %s", self.output_queue.qsize())
try:
result.update(self.output_queue.get(True, 1))
- except Empty:
+ except moves.queue.Empty:
pass
return result
@@ -259,7 +258,7 @@ class Runner(object):
log.debug("result_queue size %s", self.result_queue.qsize())
try:
one_record = self.result_queue.get(True, 1)
- except Empty:
+ except moves.queue.Empty:
pass
else:
if output_in_influxdb:
@@ -272,3 +271,22 @@ class Runner(object):
dispatchers = DispatcherBase.get(self.config['output_config'])
dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb'))
dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id)
+
+
+class RunnerProducer(producer.MessagingProducer):
+ """Class implementing the message producer for runners"""
+
+ def __init__(self, _id):
+ super(RunnerProducer, self).__init__(messaging.TOPIC_RUNNER, _id=_id)
+
+ def start_iteration(self, version=1, data=None):
+ data = {} if not data else data
+ self.send_message(
+ messaging.RUNNER_METHOD_START_ITERATION,
+ payloads.RunnerPayload(version=version, data=data))
+
+ def stop_iteration(self, version=1, data=None):
+ data = {} if not data else data
+ self.send_message(
+ messaging.RUNNER_METHOD_STOP_ITERATION,
+ payloads.RunnerPayload(version=version, data=data))