From 265cc0dfdeb9f65f8641fc5b60900b6ee1315f5a Mon Sep 17 00:00:00 2001 From: Cédric Ollivier Date: Mon, 1 Jun 2020 17:06:29 +0200 Subject: Switch to SimpleQueue() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change-Id: I1f3cb636813b9c3c10b5c829e35bbdeea02a318c Signed-off-by: Cédric Ollivier --- .../core/Try-to-detect-the-race-conditions.patch | 70 +++++++++++++++++----- 1 file changed, 56 insertions(+), 14 deletions(-) diff --git a/docker/core/Try-to-detect-the-race-conditions.patch b/docker/core/Try-to-detect-the-race-conditions.patch index 50d034418..6c9d025c5 100644 --- a/docker/core/Try-to-detect-the-race-conditions.patch +++ b/docker/core/Try-to-detect-the-race-conditions.patch @@ -1,4 +1,4 @@ -From 92cf158d8932f4509983b3813049be717093253e Mon Sep 17 00:00:00 2001 +From 41256d0983cda2914948898c5d4a0f74fa161dac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20Ollivier?= Date: Thu, 30 Apr 2020 13:59:24 +0200 Subject: [PATCH] Try to detect the race conditions @@ -9,13 +9,14 @@ Content-Transfer-Encoding: 8bit Change-Id: I9b468ec1cf79e0a66abeb1fb48f5f0f067c2c198 Signed-off-by: Cédric Ollivier --- - rally/cli/main.py | 6 ++++ - rally/plugins/task/runners/constant.py | 30 ++++++++++++++++++- - .../task/scenarios/requests/http_requests.py | 9 ++++++ - .../plugins/task/scenarios/requests/utils.py | 9 ++++++ - rally/task/runner.py | 27 +++++++++++++++-- - rally/task/utils.py | 15 ++++++++++ - 6 files changed, 93 insertions(+), 3 deletions(-) + rally/cli/main.py | 6 +++ + rally/plugins/task/runners/constant.py | 38 ++++++++++++++++--- + rally/plugins/task/runners/rps.py | 4 +- + .../task/scenarios/requests/http_requests.py | 9 +++++ + .../plugins/task/scenarios/requests/utils.py | 9 +++++ + rally/task/runner.py | 29 ++++++++++++-- + rally/task/utils.py | 15 ++++++++ + 7 files changed, 99 insertions(+), 11 deletions(-) diff --git a/rally/cli/main.py b/rally/cli/main.py index 235a57113..d931924d8 100644 @@ -35,7 +36,7 @@ index 235a57113..d931924d8 100644 from rally.cli import cliutils diff --git a/rally/plugins/task/runners/constant.py b/rally/plugins/task/runners/constant.py -index 5feb1fee1..38a01e28e 100644 +index 5feb1fee1..e872b768b 100644 --- a/rally/plugins/task/runners/constant.py +++ b/rally/plugins/task/runners/constant.py @@ -24,6 +24,10 @@ from rally.common import validation @@ -109,11 +110,48 @@ index 5feb1fee1..38a01e28e 100644 if timeout: timeout_queue.put((None, None,)) +@@ -229,8 +256,8 @@ class ConstantScenarioRunner(runner.ScenarioRunner): + concurrency_per_worker=concurrency_per_worker, + concurrency_overhead=concurrency_overhead) + +- result_queue = multiprocessing.Queue() +- event_queue = multiprocessing.Queue() ++ result_queue = multiprocessing.SimpleQueue() ++ event_queue = multiprocessing.SimpleQueue() + + def worker_args_gen(concurrency_overhead): + while True: +@@ -324,8 +351,8 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner): + concurrency_per_worker=concurrency_per_worker, + concurrency_overhead=concurrency_overhead) + +- result_queue = multiprocessing.Queue() +- event_queue = multiprocessing.Queue() ++ result_queue = multiprocessing.SimpleQueue() ++ event_queue = multiprocessing.SimpleQueue() + + def worker_args_gen(concurrency_overhead): + while True: @@ -340,3 +367,4 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner): processes_to_start, _worker_process, worker_args_gen(concurrency_overhead)) self._join_processes(process_pool, result_queue, event_queue) + +diff --git a/rally/plugins/task/runners/rps.py b/rally/plugins/task/runners/rps.py +index 98a706d11..74ccfae94 100644 +--- a/rally/plugins/task/runners/rps.py ++++ b/rally/plugins/task/runners/rps.py +@@ -260,8 +260,8 @@ class RPSScenarioRunner(runner.ScenarioRunner): + concurrency_per_worker=concurrency_per_worker, + concurrency_overhead=concurrency_overhead) + +- result_queue = multiprocessing.Queue() +- event_queue = multiprocessing.Queue() ++ result_queue = multiprocessing.SimpleQueue() ++ event_queue = multiprocessing.SimpleQueue() + + def worker_args_gen(times_overhead, concurrency_overhead): + """Generate arguments for process worker. diff --git a/rally/plugins/task/scenarios/requests/http_requests.py b/rally/plugins/task/scenarios/requests/http_requests.py index e85ee5af2..4afdf29f1 100644 --- a/rally/plugins/task/scenarios/requests/http_requests.py @@ -169,7 +207,7 @@ index 8fd35347a..cd69900a5 100644 if status_code != resp.status_code: error_msg = "Expected HTTP request code is `%s` actual `%s`" diff --git a/rally/task/runner.py b/rally/task/runner.py -index 3397e1193..57f9428f4 100644 +index 3397e1193..295558f44 100644 --- a/rally/task/runner.py +++ b/rally/task/runner.py @@ -87,6 +87,11 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs, @@ -193,7 +231,7 @@ index 3397e1193..57f9428f4 100644 process = multiprocessing.Process(target=worker_process, args=next(worker_args_gen), kwargs={"info": kwrgs}) -@@ -202,18 +209,26 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin, +@@ -202,22 +209,28 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin, :param event_queue: multiprocessing.Queue that receives the events """ while process_pool: @@ -221,8 +259,12 @@ index 3397e1193..57f9428f4 100644 + self._send_result(col_result_queue) self._flush_results() - result_queue.close() -@@ -245,8 +260,13 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin, +- result_queue.close() +- event_queue.close() + + def _flush_results(self): + if self.result_batch: +@@ -245,8 +258,13 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin, if len(self.result_batch) >= self.batch_size: sorted_batch = sorted(self.result_batch, key=lambda r: result["timestamp"]) @@ -236,7 +278,7 @@ index 3397e1193..57f9428f4 100644 def send_event(self, type, value=None): """Store event to send it to consumer later. -@@ -254,6 +274,9 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin, +@@ -254,6 +272,9 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin, :param type: Event type :param value: Optional event data """ -- cgit 1.2.3-korg