From 304497b81fbbe9cb8608b947cae76aeaa2b0934e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20Ollivier?= Date: Wed, 3 Jun 2020 15:23:59 +0200 Subject: [PATCH 11/11] Try to detect the race conditions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change-Id: I582933832e23d188c7fa5999e713dd5d7e82d2da Signed-off-by: Cédric Ollivier --- rally/cli/main.py | 5 ++++- rally/task/runner.py | 23 ++++++++++++++++++----- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/rally/cli/main.py b/rally/cli/main.py index 235a57113..14c057c0e 100644 --- a/rally/cli/main.py +++ b/rally/cli/main.py @@ -15,6 +15,10 @@ """CLI interface for Rally.""" +STACK_SIZE = 1024 * 1024 +import threading +threading.stack_size(STACK_SIZE) + import sys from rally.cli import cliutils @@ -25,7 +29,6 @@ from rally.cli.commands import plugin from rally.cli.commands import task from rally.cli.commands import verify - categories = { "db": db.DBCommands, "env": env.EnvCommands, diff --git a/rally/task/runner.py b/rally/task/runner.py index 3397e1193..b2fde8550 100644 --- a/rally/task/runner.py +++ b/rally/task/runner.py @@ -17,6 +17,7 @@ import abc import collections import copy import multiprocessing +import threading import time from rally.common import logging @@ -51,10 +52,14 @@ def _get_scenario_context(iteration, context_obj): def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs, event_queue): iteration = context_obj["iteration"] + LOG.info("DEBUGRACE %s putting in event_queue iteration: %s", + threading.get_native_id(), iteration) event_queue.put({ "type": "iteration", "value": iteration, }) + LOG.info("DEBUGRACE %s put in event_queue iteration: %s", + threading.get_native_id(), iteration) # provide arguments isolation between iterations scenario_kwargs = copy.deepcopy(scenario_kwargs) @@ -65,6 +70,8 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs, scenario_inst = cls(context_obj) error = [] try: + LOG.info("DEBUGRACE %s running %s %s", + threading.get_native_id(), scenario_inst, scenario_inst) with rutils.Timer() as timer: getattr(scenario_inst, method_name)(**scenario_kwargs) except Exception as e: @@ -87,8 +94,14 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs, def _worker_thread(queue, cls, method_name, context_obj, scenario_kwargs, event_queue): - queue.put(_run_scenario_once(cls, method_name, context_obj, - scenario_kwargs, event_queue)) + result = _run_scenario_once(cls, method_name, context_obj, + scenario_kwargs, event_queue) + LOG.info("DEBUGRACE %s putting in result_queue context_obj: %s", + threading.get_native_id(), + context_obj) + queue.put(result) + LOG.info("DEBUGRACE %s put in result_queue context_obj: %s: %s", + threading.get_native_id(), context_obj, result) def _log_worker_info(**info): @@ -186,9 +199,9 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin, for i in range(processes_to_start): kwrgs = {"processes_to_start": processes_to_start, "processes_counter": i} - process = multiprocessing.Process(target=worker_process, - args=next(worker_args_gen), - kwargs={"info": kwrgs}) + process = threading.Thread(target=worker_process, + args=next(worker_args_gen), + kwargs={"info": kwrgs}) process.start() process_pool.append(process) -- 2.26.2