1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
From 304497b81fbbe9cb8608b947cae76aeaa2b0934e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?C=C3=A9dric=20Ollivier?= <cedric.ollivier@orange.com>
Date: Wed, 3 Jun 2020 15:23:59 +0200
Subject: [PATCH 11/11] Try to detect the race conditions
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Change-Id: I582933832e23d188c7fa5999e713dd5d7e82d2da
Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
---
rally/cli/main.py | 5 ++++-
rally/task/runner.py | 23 ++++++++++++++++++-----
2 files changed, 22 insertions(+), 6 deletions(-)
diff --git a/rally/cli/main.py b/rally/cli/main.py
index 235a57113..14c057c0e 100644
--- a/rally/cli/main.py
+++ b/rally/cli/main.py
@@ -15,6 +15,10 @@
"""CLI interface for Rally."""
+STACK_SIZE = 1024 * 1024
+import threading
+threading.stack_size(STACK_SIZE)
+
import sys
from rally.cli import cliutils
@@ -25,7 +29,6 @@ from rally.cli.commands import plugin
from rally.cli.commands import task
from rally.cli.commands import verify
-
categories = {
"db": db.DBCommands,
"env": env.EnvCommands,
diff --git a/rally/task/runner.py b/rally/task/runner.py
index 3397e1193..b2fde8550 100644
--- a/rally/task/runner.py
+++ b/rally/task/runner.py
@@ -17,6 +17,7 @@ import abc
import collections
import copy
import multiprocessing
+import threading
import time
from rally.common import logging
@@ -51,10 +52,14 @@ def _get_scenario_context(iteration, context_obj):
def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
event_queue):
iteration = context_obj["iteration"]
+ LOG.info("DEBUGRACE %s putting in event_queue iteration: %s",
+ threading.get_native_id(), iteration)
event_queue.put({
"type": "iteration",
"value": iteration,
})
+ LOG.info("DEBUGRACE %s put in event_queue iteration: %s",
+ threading.get_native_id(), iteration)
# provide arguments isolation between iterations
scenario_kwargs = copy.deepcopy(scenario_kwargs)
@@ -65,6 +70,8 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
scenario_inst = cls(context_obj)
error = []
try:
+ LOG.info("DEBUGRACE %s running %s %s",
+ threading.get_native_id(), scenario_inst, scenario_inst)
with rutils.Timer() as timer:
getattr(scenario_inst, method_name)(**scenario_kwargs)
except Exception as e:
@@ -87,8 +94,14 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
def _worker_thread(queue, cls, method_name, context_obj, scenario_kwargs,
event_queue):
- queue.put(_run_scenario_once(cls, method_name, context_obj,
- scenario_kwargs, event_queue))
+ result = _run_scenario_once(cls, method_name, context_obj,
+ scenario_kwargs, event_queue)
+ LOG.info("DEBUGRACE %s putting in result_queue context_obj: %s",
+ threading.get_native_id(),
+ context_obj)
+ queue.put(result)
+ LOG.info("DEBUGRACE %s put in result_queue context_obj: %s: %s",
+ threading.get_native_id(), context_obj, result)
def _log_worker_info(**info):
@@ -186,9 +199,9 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
for i in range(processes_to_start):
kwrgs = {"processes_to_start": processes_to_start,
"processes_counter": i}
- process = multiprocessing.Process(target=worker_process,
- args=next(worker_args_gen),
- kwargs={"info": kwrgs})
+ process = threading.Thread(target=worker_process,
+ args=next(worker_args_gen),
+ kwargs={"info": kwrgs})
process.start()
process_pool.append(process)
--
2.26.2
|