aboutsummaryrefslogtreecommitdiffstats
path: root/docker/core/Try-to-detect-the-race-conditions.patch
blob: 9413d761722453c5d3eba6443f52bd5f7b64ae14 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
From 304497b81fbbe9cb8608b947cae76aeaa2b0934e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?C=C3=A9dric=20Ollivier?= <cedric.ollivier@orange.com>
Date: Wed, 3 Jun 2020 15:23:59 +0200
Subject: [PATCH 11/11] Try to detect the race conditions
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Change-Id: I582933832e23d188c7fa5999e713dd5d7e82d2da
Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
---
 rally/cli/main.py    |  5 ++++-
 rally/task/runner.py | 23 ++++++++++++++++++-----
 2 files changed, 22 insertions(+), 6 deletions(-)

diff --git a/rally/cli/main.py b/rally/cli/main.py
index 235a57113..14c057c0e 100644
--- a/rally/cli/main.py
+++ b/rally/cli/main.py
@@ -15,6 +15,10 @@
 
 """CLI interface for Rally."""
 
+STACK_SIZE = 1024 * 1024
+import threading
+threading.stack_size(STACK_SIZE)
+
 import sys
 
 from rally.cli import cliutils
@@ -25,7 +29,6 @@ from rally.cli.commands import plugin
 from rally.cli.commands import task
 from rally.cli.commands import verify
 
-
 categories = {
     "db": db.DBCommands,
     "env": env.EnvCommands,
diff --git a/rally/task/runner.py b/rally/task/runner.py
index 3397e1193..b2fde8550 100644
--- a/rally/task/runner.py
+++ b/rally/task/runner.py
@@ -17,6 +17,7 @@ import abc
 import collections
 import copy
 import multiprocessing
+import threading
 import time
 
 from rally.common import logging
@@ -51,10 +52,14 @@ def _get_scenario_context(iteration, context_obj):
 def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
                        event_queue):
     iteration = context_obj["iteration"]
+    LOG.info("DEBUGRACE %s putting in event_queue iteration: %s",
+             threading.get_native_id(), iteration)
     event_queue.put({
         "type": "iteration",
         "value": iteration,
     })
+    LOG.info("DEBUGRACE %s put in event_queue iteration: %s",
+             threading.get_native_id(), iteration)
 
     # provide arguments isolation between iterations
     scenario_kwargs = copy.deepcopy(scenario_kwargs)
@@ -65,6 +70,8 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
     scenario_inst = cls(context_obj)
     error = []
     try:
+        LOG.info("DEBUGRACE %s running %s %s",
+                 threading.get_native_id(), scenario_inst, scenario_inst)
         with rutils.Timer() as timer:
             getattr(scenario_inst, method_name)(**scenario_kwargs)
     except Exception as e:
@@ -87,8 +94,14 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
 
 def _worker_thread(queue, cls, method_name, context_obj, scenario_kwargs,
                    event_queue):
-    queue.put(_run_scenario_once(cls, method_name, context_obj,
-                                 scenario_kwargs, event_queue))
+    result = _run_scenario_once(cls, method_name, context_obj,
+                                scenario_kwargs, event_queue)
+    LOG.info("DEBUGRACE %s putting in result_queue context_obj: %s",
+             threading.get_native_id(),
+             context_obj)
+    queue.put(result)
+    LOG.info("DEBUGRACE %s put in result_queue context_obj: %s: %s",
+             threading.get_native_id(), context_obj, result)
 
 
 def _log_worker_info(**info):
@@ -186,9 +199,9 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
         for i in range(processes_to_start):
             kwrgs = {"processes_to_start": processes_to_start,
                      "processes_counter": i}
-            process = multiprocessing.Process(target=worker_process,
-                                              args=next(worker_args_gen),
-                                              kwargs={"info": kwrgs})
+            process = threading.Thread(target=worker_process,
+                                       args=next(worker_args_gen),
+                                       kwargs={"info": kwrgs})
             process.start()
             process_pool.append(process)
 
-- 
2.26.2